From c7a37bf5f0d3774c262b2574ed287344de22c297 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 10 Dec 2025 11:58:53 +0200 Subject: [PATCH] try to fix chart udates - wip --- ANNOTATE/core/data_loader.py | 20 +- ANNOTATE/core/inference_training_system.py | 21 +- ANNOTATE/core/real_training_adapter.py | 10 +- ANNOTATE/web/app.py | 101 +------ ANNOTATE/web/static/js/chart_manager.js | 263 ++++++++++++++---- .../web/static/js/live_updates_polling.js | 43 ++- ANNOTATE/web/static/js/live_updates_ws.js | 7 +- core/cob_integration.py | 38 ++- core/data_provider.py | 31 ++- 9 files changed, 364 insertions(+), 170 deletions(-) diff --git a/ANNOTATE/core/data_loader.py b/ANNOTATE/core/data_loader.py index 3ee1928..9484241 100644 --- a/ANNOTATE/core/data_loader.py +++ b/ANNOTATE/core/data_loader.py @@ -88,7 +88,12 @@ class HistoricalDataLoader: try: # FORCE refresh for 1s/1m if requesting latest data OR incremental update - force_refresh = (timeframe in ['1s', '1m'] and (bypass_cache or (not start_time and not end_time))) + # Also force refresh for live updates (small limit + direction='latest' + no time range) + is_live_update = (direction == 'latest' and not start_time and not end_time and limit <= 5) + force_refresh = (timeframe in ['1s', '1m'] and (bypass_cache or (not start_time and not end_time))) or is_live_update + + if is_live_update: + logger.debug(f"Live update detected for {symbol} {timeframe} (limit={limit}, direction={direction}) - forcing refresh") # Try to get data from DataProvider's cached data first (most efficient) if hasattr(self.data_provider, 'cached_data'): @@ -279,6 +284,19 @@ class HistoricalDataLoader: limit=limit, allow_stale_cache=True ) + elif is_live_update: + # For live updates, use get_latest_candles which combines cached + real-time data + logger.debug(f"Getting live candles (cached + real-time) for {symbol} {timeframe}") + df = self.data_provider.get_latest_candles( + symbol=symbol, + timeframe=timeframe, + limit=limit + ) + + # Log the latest candle timestamp to help debug stale data + if df is not None and not df.empty: + latest_timestamp = df.index[-1] if hasattr(df.index, '__getitem__') else df.iloc[-1].name + logger.debug(f"Live update for {symbol} {timeframe}: latest candle at {latest_timestamp}") else: # Fetch from API and store in DuckDB (no time range specified) # For 1s/1m, logging every request is too verbose, use debug diff --git a/ANNOTATE/core/inference_training_system.py b/ANNOTATE/core/inference_training_system.py index febb348..32a1e52 100644 --- a/ANNOTATE/core/inference_training_system.py +++ b/ANNOTATE/core/inference_training_system.py @@ -34,7 +34,7 @@ class TrainingTriggerType(Enum): @dataclass class InferenceFrameReference: """ - Reference to inference data stored in DuckDB. + Reference to inference data stored in DuckDB with human-readable prediction outputs. No copying - just store timestamp ranges and query when needed. """ inference_id: str # Unique ID for this inference @@ -50,14 +50,27 @@ class InferenceFrameReference: # Normalization parameters (small, can be stored) norm_params: Dict[str, Dict[str, float]] = field(default_factory=dict) - # Prediction metadata - predicted_action: Optional[str] = None - predicted_candle: Optional[Dict[str, List[float]]] = None + # ENHANCED: Human-readable prediction outputs + predicted_action: Optional[str] = None # 'BUY', 'SELL', 'HOLD' + predicted_candle: Optional[Dict[str, List[float]]] = None # {timeframe: [O,H,L,C,V]} + predicted_price: Optional[float] = None # Main predicted price confidence: float = 0.0 + # Model metadata for decision making + model_type: str = 'transformer' # 'transformer', 'cnn', 'dqn' + prediction_steps: int = 1 # Number of steps predicted ahead + # Training status trained: bool = False training_timestamp: Optional[datetime] = None + training_loss: Optional[float] = None + training_accuracy: Optional[float] = None + + # Actual results (filled when candle completes) + actual_candle: Optional[List[float]] = None # [O,H,L,C,V] + actual_price: Optional[float] = None + prediction_error: Optional[float] = None # |predicted - actual| + direction_correct: Optional[bool] = None # Did we predict direction correctly? @dataclass diff --git a/ANNOTATE/core/real_training_adapter.py b/ANNOTATE/core/real_training_adapter.py index 4c051bf..0f19dd5 100644 --- a/ANNOTATE/core/real_training_adapter.py +++ b/ANNOTATE/core/real_training_adapter.py @@ -3439,14 +3439,8 @@ class RealTrainingAdapter: all_signals.sort(key=lambda x: x.get('timestamp', ''), reverse=True) return all_signals[:limit] - def _make_realtime_prediction_with_cache(self, model_name: str, symbol: str, data_provider, session: Dict) -> Tuple[Dict, bool]: - """ - DEPRECATED: Use _make_realtime_prediction + _register_inference_frame instead. - This method is kept for backward compatibility but should be removed. - """ - # Just call the regular prediction method - prediction = self._make_realtime_prediction(model_name, symbol, data_provider) - return prediction, False + # REMOVED: Deprecated _make_realtime_prediction_with_cache method + # Now using unified InferenceFrameReference system """ Make a prediction and store input data frame for later training diff --git a/ANNOTATE/web/app.py b/ANNOTATE/web/app.py index a2a6866..e93aa2f 100644 --- a/ANNOTATE/web/app.py +++ b/ANNOTATE/web/app.py @@ -768,9 +768,8 @@ class AnnotationDashboard: # Backtest runner for replaying visible chart with predictions self.backtest_runner = BacktestRunner() - # Prediction cache for training: stores inference inputs/outputs to compare with actual candles - # Format: {symbol: {timeframe: [{'timestamp': ts, 'inputs': {...}, 'outputs': {...}, 'norm_params': {...}}, ...]}} - self.prediction_cache = {} + # NOTE: Prediction caching is now handled by InferenceFrameReference system + # See ANNOTATE/core/inference_training_system.py for the unified implementation # Check if we should auto-load a model at startup auto_load_model = os.getenv('AUTO_LOAD_MODEL', 'Transformer') # Default: Transformer @@ -2636,6 +2635,7 @@ class AnnotationDashboard: response = { 'success': True, + 'server_time': datetime.now(timezone.utc).isoformat(), # Add server timestamp to detect stale data 'chart_updates': {}, # Dict of timeframe -> chart_update 'prediction': None # Single prediction for all timeframes } @@ -3445,34 +3445,8 @@ class AnnotationDashboard: elif '1s' in predicted_candles_denorm: predicted_price = predicted_candles_denorm['1s'][3] - # CACHE inference data for later training - # Store inputs, outputs, and normalization params so we can train when actual candle arrives - if symbol not in self.prediction_cache: - self.prediction_cache[symbol] = {} - if timeframe not in self.prediction_cache[symbol]: - self.prediction_cache[symbol][timeframe] = [] - - # Store cached inference data (convert tensors to CPU for storage) - cached_data = { - 'timestamp': timestamp, - 'symbol': symbol, - 'timeframe': timeframe, - 'model_inputs': {k: v.cpu().clone() if isinstance(v, torch.Tensor) else v - for k, v in market_data.items()}, - 'model_outputs': {k: v.cpu().clone() if isinstance(v, torch.Tensor) else v - for k, v in outputs.items()}, - 'normalization_params': norm_params, - 'predicted_candle': predicted_candles_denorm.get(timeframe), - 'prediction_steps': prediction_steps - } - - self.prediction_cache[symbol][timeframe].append(cached_data) - - # Keep only last 100 predictions per symbol/timeframe to prevent memory bloat - if len(self.prediction_cache[symbol][timeframe]) > 100: - self.prediction_cache[symbol][timeframe] = self.prediction_cache[symbol][timeframe][-100:] - - logger.debug(f"Cached prediction for {symbol} {timeframe} @ {timestamp.isoformat()}") + # NOTE: Caching is now handled by InferenceFrameReference system in real_training_adapter + # This provides more efficient reference-based storage without copying 600 candles # Return prediction result (same format as before for compatibility) return { @@ -3492,69 +3466,8 @@ class AnnotationDashboard: logger.debug(traceback.format_exc()) return None - def get_cached_predictions_for_training(self, symbol: str, timeframe: str, actual_candle_timestamp) -> List[Dict]: - """ - Retrieve cached predictions that match a specific candle timestamp for training - - When an actual candle arrives, we can: - 1. Find cached predictions made before this candle - 2. Compare predicted vs actual candle values - 3. Calculate loss and do backpropagation - - Args: - symbol: Trading symbol - timeframe: Timeframe - actual_candle_timestamp: Timestamp of the actual candle that just arrived - - Returns: - List of cached prediction dicts that should be trained on - """ - try: - if symbol not in self.prediction_cache: - return [] - if timeframe not in self.prediction_cache[symbol]: - return [] - - # Find predictions made before this candle timestamp - # Predictions should be for candles that have now completed - matching_predictions = [] - actual_time = actual_candle_timestamp if isinstance(actual_candle_timestamp, datetime) else datetime.fromisoformat(str(actual_candle_timestamp).replace('Z', '+00:00')) - - for cached_pred in self.prediction_cache[symbol][timeframe]: - pred_time = cached_pred['timestamp'] - if isinstance(pred_time, str): - pred_time = datetime.fromisoformat(pred_time.replace('Z', '+00:00')) - - # Prediction should be for a candle that comes after the prediction time - # We match predictions that were made before the actual candle closed - if pred_time < actual_time: - matching_predictions.append(cached_pred) - - return matching_predictions - - except Exception as e: - logger.error(f"Error getting cached predictions for training: {e}") - return [] - - def clear_old_cached_predictions(self, symbol: str, timeframe: str, before_timestamp: datetime): - """ - Clear cached predictions older than a certain timestamp - - Useful for cleaning up old predictions that are no longer needed - """ - try: - if symbol not in self.prediction_cache: - return - if timeframe not in self.prediction_cache[symbol]: - return - - self.prediction_cache[symbol][timeframe] = [ - pred for pred in self.prediction_cache[symbol][timeframe] - if pred['timestamp'] >= before_timestamp - ] - - except Exception as e: - logger.debug(f"Error clearing old cached predictions: {e}") + # REMOVED: Unused prediction caching methods + # Now using InferenceFrameReference system for unified prediction storage and training def run(self, host='0.0.0.0', port=8051, debug=False): """Run the application - binds to all interfaces by default""" diff --git a/ANNOTATE/web/static/js/chart_manager.js b/ANNOTATE/web/static/js/chart_manager.js index 04fab09..1ec8f83 100644 --- a/ANNOTATE/web/static/js/chart_manager.js +++ b/ANNOTATE/web/static/js/chart_manager.js @@ -548,9 +548,17 @@ class ChartManager { */ updateLatestCandle(symbol, timeframe, candle) { try { + console.log(`[updateLatestCandle] Called for ${timeframe}:`, { + symbol: symbol, + timestamp: candle.timestamp, + is_confirmed: candle.is_confirmed, + hasChart: !!this.charts[timeframe], + availableCharts: Object.keys(this.charts) + }); + const chart = this.charts[timeframe]; if (!chart) { - console.debug(`Chart ${timeframe} not found for live update`); + console.warn(`[updateLatestCandle] Chart ${timeframe} not found for live update. Available charts:`, Object.keys(this.charts)); return; } @@ -558,7 +566,7 @@ class ChartManager { const plotElement = document.getElementById(plotId); if (!plotElement) { - console.debug(`Plot element ${plotId} not found`); + console.warn(`[updateLatestCandle] Plot element ${plotId} not found in DOM`); return; } @@ -575,11 +583,11 @@ class ChartManager { } // CRITICAL FIX: Parse timestamp ensuring UTC handling - // Backend now sends ISO format with 'Z' (e.g., '2025-12-08T21:00:00Z') + // Backend now sends ISO format with timezone (e.g., '2025-12-10T09:19:51+00:00') // JavaScript Date will parse this correctly as UTC let candleTimestamp; if (typeof candle.timestamp === 'string') { - // If it's already ISO format with 'Z', parse directly + // If it's already ISO format with 'Z' or timezone offset, parse directly if (candle.timestamp.includes('T') && (candle.timestamp.endsWith('Z') || candle.timestamp.includes('+'))) { candleTimestamp = new Date(candle.timestamp); } else if (candle.timestamp.includes('T')) { @@ -593,6 +601,12 @@ class ChartManager { candleTimestamp = new Date(candle.timestamp); } + // Validate timestamp + if (isNaN(candleTimestamp.getTime())) { + console.error(`[${timeframe}] Invalid timestamp: ${candle.timestamp}`); + return; + } + // Format using UTC methods and ISO format with 'Z' for consistency const year = candleTimestamp.getUTCFullYear(); const month = String(candleTimestamp.getUTCMonth() + 1).padStart(2, '0'); @@ -604,65 +618,86 @@ class ChartManager { const formattedTimestamp = `${year}-${month}-${day}T${hours}:${minutes}:${seconds}Z`; // Get current chart data from Plotly - const chartData = Plotly.Plots.data(plotId); + const chartData = plotElement.data; if (!chartData || chartData.length < 2) { - console.debug(`Chart ${plotId} not initialized yet`); + console.warn(`[updateLatestCandle] Chart ${plotId} not initialized yet (no data traces)`); return; } const candlestickTrace = chartData[0]; const volumeTrace = chartData[1]; + // Ensure we have valid trace data + if (!candlestickTrace || !candlestickTrace.x || candlestickTrace.x.length === 0) { + console.warn(`[updateLatestCandle] Candlestick trace has no data for ${timeframe}`); + return; + } + + console.log(`[updateLatestCandle] Chart ${timeframe} has ${candlestickTrace.x.length} candles currently`); + + // CRITICAL FIX: Check is_confirmed flag first + // If candle is confirmed, it's a NEW completed candle (not an update to the current one) + const isConfirmed = candle.is_confirmed === true; + // Check if this is updating the last candle or adding a new one // Use more lenient comparison to handle timestamp format differences const lastTimestamp = candlestickTrace.x[candlestickTrace.x.length - 1]; const lastTimeMs = lastTimestamp ? new Date(lastTimestamp).getTime() : 0; const candleTimeMs = candleTimestamp.getTime(); - // Consider it a new candle if timestamp is at least 500ms newer (to handle jitter) - const isNewCandle = !lastTimestamp || (candleTimeMs - lastTimeMs) >= 500; - if (isNewCandle) { - // Add new candle - update both Plotly and internal data structure - Plotly.extendTraces(plotId, { - x: [[formattedTimestamp]], - open: [[candle.open]], - high: [[candle.high]], - low: [[candle.low]], - close: [[candle.close]] - }, [0]); - - // Update volume color based on price direction - const volumeColor = candle.close >= candle.open ? '#10b981' : '#ef4444'; - Plotly.extendTraces(plotId, { - x: [[formattedTimestamp]], - y: [[candle.volume]], - marker: { color: [[volumeColor]] } - }, [1]); - - // Update internal data structure - chart.data.timestamps.push(formattedTimestamp); - chart.data.open.push(candle.open); - chart.data.high.push(candle.high); - chart.data.low.push(candle.low); - chart.data.close.push(candle.close); - chart.data.volume.push(candle.volume); - - console.log(`[${timeframe}] Added new candle: ${formattedTimestamp}`, { + // Determine if this is a new candle: + // 1. If no last timestamp exists, it's always new + // 2. If timestamp is significantly newer (at least 1 second for 1s, or timeframe period for others) + // 3. If confirmed AND timestamp is different, it's a new candle + // 4. If confirmed AND timestamp matches, we REPLACE the last candle (it was forming, now confirmed) + let timeframePeriodMs = 1000; // Default 1 second + if (timeframe === '1m') timeframePeriodMs = 60000; + else if (timeframe === '1h') timeframePeriodMs = 3600000; + else if (timeframe === '1d') timeframePeriodMs = 86400000; + + // Check if timestamps match (within 1 second tolerance) + const timestampMatches = lastTimestamp && Math.abs(candleTimeMs - lastTimeMs) < 1000; + + // If confirmed and timestamp matches, we replace the last candle (it was forming, now confirmed) + // Otherwise, if timestamp is newer or confirmed with different timestamp, it's a new candle + const isNewCandle = !lastTimestamp || + (isConfirmed && !timestampMatches) || + (!isConfirmed && (candleTimeMs - lastTimeMs) >= timeframePeriodMs); + + // Special case: if confirmed and timestamp matches, we update the last candle (replace forming with confirmed) + const shouldReplaceLast = isConfirmed && timestampMatches && lastTimestamp; + + if (shouldReplaceLast) { + // Special case: Confirmed candle with same timestamp - replace the last candle (forming -> confirmed) + console.log(`[${timeframe}] REPLACING last candle (forming -> confirmed): ${formattedTimestamp}`, { + timestamp: candle.timestamp, open: candle.open, high: candle.high, low: candle.low, close: candle.close, volume: candle.volume }); - } else { - // Update last candle - update both Plotly and internal data structure + + // Use the same update logic as updating existing candle const x = [...candlestickTrace.x]; const open = [...candlestickTrace.open]; const high = [...candlestickTrace.high]; const low = [...candlestickTrace.low]; const close = [...candlestickTrace.close]; const volume = [...volumeTrace.y]; - const colors = Array.isArray(volumeTrace.marker.color) ? [...volumeTrace.marker.color] : [volumeTrace.marker.color]; + + // Handle volume colors + let colors; + if (Array.isArray(volumeTrace.marker.color)) { + colors = [...volumeTrace.marker.color]; + } else if (volumeTrace.marker && volumeTrace.marker.color) { + colors = new Array(volume.length).fill(volumeTrace.marker.color); + } else { + colors = volume.map((v, i) => { + if (i === 0) return '#3b82f6'; + return close[i] >= open[i] ? '#10b981' : '#ef4444'; + }); + } const lastIdx = x.length - 1; @@ -700,7 +735,141 @@ class ChartManager { chart.data.volume[lastIdx] = candle.volume; } - console.log(`[${timeframe}] Updated last candle: ${formattedTimestamp}`); + console.log(`[${timeframe}] Successfully replaced last candle (confirmed)`); + } else if (isNewCandle) { + // Add new candle - update both Plotly and internal data structure + console.log(`[${timeframe}] Adding NEW candle (confirmed: ${isConfirmed}): ${formattedTimestamp}`, { + timestamp: candle.timestamp, + formattedTimestamp: formattedTimestamp, + open: candle.open, + high: candle.high, + low: candle.low, + close: candle.close, + volume: candle.volume, + lastTimestamp: lastTimestamp, + timeDiff: lastTimestamp ? (candleTimeMs - lastTimeMs) + 'ms' : 'N/A', + currentCandleCount: candlestickTrace.x.length + }); + + try { + // CRITICAL: Plotly.extendTraces expects arrays of arrays + // Each trace gets an array, and each array contains the new data points + Plotly.extendTraces(plotId, { + x: [[formattedTimestamp]], + open: [[candle.open]], + high: [[candle.high]], + low: [[candle.low]], + close: [[candle.close]] + }, [0]).then(() => { + console.log(`[${timeframe}] Candlestick trace extended successfully`); + }).catch(err => { + console.error(`[${timeframe}] Error extending candlestick trace:`, err); + }); + + // Update volume color based on price direction + const volumeColor = candle.close >= candle.open ? '#10b981' : '#ef4444'; + Plotly.extendTraces(plotId, { + x: [[formattedTimestamp]], + y: [[candle.volume]], + marker: { color: [[volumeColor]] } + }, [1]).then(() => { + console.log(`[${timeframe}] Volume trace extended successfully`); + }).catch(err => { + console.error(`[${timeframe}] Error extending volume trace:`, err); + }); + + // Update internal data structure + chart.data.timestamps.push(formattedTimestamp); + chart.data.open.push(candle.open); + chart.data.high.push(candle.high); + chart.data.low.push(candle.low); + chart.data.close.push(candle.close); + chart.data.volume.push(candle.volume); + + console.log(`[${timeframe}] Successfully added new candle. Total candles: ${chart.data.timestamps.length}`); + } catch (error) { + console.error(`[${timeframe}] Error adding new candle:`, error); + } + } else { + // Update last candle - update both Plotly and internal data structure + console.log(`[${timeframe}] Updating EXISTING candle: ${formattedTimestamp}`, { + timestamp: candle.timestamp, + open: candle.open, + high: candle.high, + low: candle.low, + close: candle.close, + volume: candle.volume, + lastTimestamp: lastTimestamp, + timeDiff: (candleTimeMs - lastTimeMs) + 'ms' + }); + + const x = [...candlestickTrace.x]; + const open = [...candlestickTrace.open]; + const high = [...candlestickTrace.high]; + const low = [...candlestickTrace.low]; + const close = [...candlestickTrace.close]; + const volume = [...volumeTrace.y]; + + // Handle volume colors - ensure it's an array + let colors; + if (Array.isArray(volumeTrace.marker.color)) { + colors = [...volumeTrace.marker.color]; + } else if (volumeTrace.marker && volumeTrace.marker.color) { + // Single color - convert to array + colors = new Array(volume.length).fill(volumeTrace.marker.color); + } else { + // No color - create default array + colors = volume.map((v, i) => { + if (i === 0) return '#3b82f6'; + return close[i] >= open[i] ? '#10b981' : '#ef4444'; + }); + } + + const lastIdx = x.length - 1; + + // Update local arrays + x[lastIdx] = formattedTimestamp; + open[lastIdx] = candle.open; + high[lastIdx] = candle.high; + low[lastIdx] = candle.low; + close[lastIdx] = candle.close; + volume[lastIdx] = candle.volume; + colors[lastIdx] = candle.close >= candle.open ? '#10b981' : '#ef4444'; + + // Push updates to Plotly + Plotly.restyle(plotId, { + x: [x], + open: [open], + high: [high], + low: [low], + close: [close] + }, [0]); + + Plotly.restyle(plotId, { + x: [x], + y: [volume], + 'marker.color': [colors] + }, [1]); + + // Update internal data structure + if (chart.data.timestamps.length > lastIdx) { + chart.data.timestamps[lastIdx] = formattedTimestamp; + chart.data.open[lastIdx] = candle.open; + chart.data.high[lastIdx] = candle.high; + chart.data.low[lastIdx] = candle.low; + chart.data.close[lastIdx] = candle.close; + chart.data.volume[lastIdx] = candle.volume; + } else { + // If internal data is shorter, append + chart.data.timestamps.push(formattedTimestamp); + chart.data.open.push(candle.open); + chart.data.high.push(candle.high); + chart.data.low.push(candle.low); + chart.data.close.push(candle.close); + chart.data.volume.push(candle.volume); + } + + console.log(`[${timeframe}] Successfully updated last candle`); } // CRITICAL: Check if we have enough candles to validate predictions (2s delay logic) @@ -2024,10 +2193,7 @@ class ChartManager { plotElement.style.height = `${chartHeight}px`; // Trigger Plotly resize - const plotId = plotElement.id; - if (plotId) { - Plotly.Plots.resize(plotId); - } + Plotly.Plots.resize(plotElement); } }); } else { @@ -2040,10 +2206,7 @@ class ChartManager { plotElement.style.height = '300px'; // Trigger Plotly resize - const plotId = plotElement.id; - if (plotId) { - Plotly.Plots.resize(plotId); - } + Plotly.Plots.resize(plotElement); } }); } @@ -3087,11 +3250,11 @@ class ChartManager { console.log(`[updatePredictions] Timeframe: ${timeframe}, Predictions:`, predictions); const plotId = chart.plotId; - const plotElement = document.getElementById(plotId); - if (!plotElement) return; + const chartElement = document.getElementById(plotId); + if (!chartElement) return; // Get current chart data - const chartData = plotElement.data; + const chartData = chartElement.data; if (!chartData || chartData.length < 2) return; // Prepare prediction markers diff --git a/ANNOTATE/web/static/js/live_updates_polling.js b/ANNOTATE/web/static/js/live_updates_polling.js index de3ded5..9215895 100644 --- a/ANNOTATE/web/static/js/live_updates_polling.js +++ b/ANNOTATE/web/static/js/live_updates_polling.js @@ -83,11 +83,27 @@ class LiveUpdatesPolling { // Handle chart updates for each timeframe if (data.chart_updates && this.onChartUpdate) { // chart_updates is an object: { '1s': {...}, '1m': {...}, ... } + console.log('[Live Updates] Processing chart_updates:', Object.keys(data.chart_updates)); Object.entries(data.chart_updates).forEach(([timeframe, update]) => { if (update) { + console.log(`[Live Updates] Calling onChartUpdate for ${timeframe}:`, { + symbol: update.symbol, + timeframe: update.timeframe, + timestamp: update.candle?.timestamp, + is_confirmed: update.is_confirmed + }); this.onChartUpdate(update); + } else { + console.warn(`[Live Updates] Update for ${timeframe} is null/undefined`); } }); + } else { + if (!data.chart_updates) { + console.debug('[Live Updates] No chart_updates in response'); + } + if (!this.onChartUpdate) { + console.warn('[Live Updates] onChartUpdate callback not set!'); + } } // Handle prediction update (single prediction for all timeframes) @@ -169,8 +185,33 @@ document.addEventListener('DOMContentLoaded', function() { window.liveUpdatesPolling.onChartUpdate = function(data) { // Update chart with new candle + // data structure: { symbol, timeframe, candle: {...}, is_confirmed: true/false } + console.log('[onChartUpdate] Callback invoked with data:', { + symbol: data.symbol, + timeframe: data.timeframe, + hasCandle: !!data.candle, + is_confirmed: data.is_confirmed, + hasAppState: !!window.appState, + hasChartManager: !!(window.appState && window.appState.chartManager) + }); + if (window.appState && window.appState.chartManager) { - window.appState.chartManager.updateLatestCandle(data.symbol, data.timeframe, data.candle); + // Pass the full update object so is_confirmed is available + const candleWithFlag = { + ...data.candle, + is_confirmed: data.is_confirmed + }; + console.log('[onChartUpdate] Calling updateLatestCandle with:', { + symbol: data.symbol, + timeframe: data.timeframe, + candle: candleWithFlag + }); + window.appState.chartManager.updateLatestCandle(data.symbol, data.timeframe, candleWithFlag); + } else { + console.warn('[onChartUpdate] Chart manager not available!', { + hasAppState: !!window.appState, + hasChartManager: !!(window.appState && window.appState.chartManager) + }); } }; diff --git a/ANNOTATE/web/static/js/live_updates_ws.js b/ANNOTATE/web/static/js/live_updates_ws.js index d6bebc5..d241808 100644 --- a/ANNOTATE/web/static/js/live_updates_ws.js +++ b/ANNOTATE/web/static/js/live_updates_ws.js @@ -219,8 +219,13 @@ document.addEventListener('DOMContentLoaded', function() { window.liveUpdatesWS.onChartUpdate = function(data) { // Update chart with new candle + // data structure: { symbol, timeframe, candle: {...}, is_confirmed: true/false } if (window.appState && window.appState.chartManager) { - window.appState.chartManager.updateLatestCandle(data.symbol, data.timeframe, data.candle); + // Pass the full update object so is_confirmed is available + window.appState.chartManager.updateLatestCandle(data.symbol, data.timeframe, { + ...data.candle, + is_confirmed: data.is_confirmed + }); } }; diff --git a/core/cob_integration.py b/core/cob_integration.py index 5a605f7..70233a7 100644 --- a/core/cob_integration.py +++ b/core/cob_integration.py @@ -17,7 +17,7 @@ import asyncio import logging import numpy as np import pandas as pd -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional, Any, Callable from threading import Thread import json @@ -94,10 +94,24 @@ class COBIntegration: # Initialize Enhanced WebSocket first try: - # Enhanced WebSocket initialization would go here - logger.info("Enhanced WebSocket initialized successfully") + from .enhanced_cob_websocket import EnhancedCOBWebSocket + + # Initialize Enhanced WebSocket with dashboard callback + self.enhanced_websocket = EnhancedCOBWebSocket( + symbols=self.symbols, + dashboard_callback=self._on_websocket_status_update + ) + + # Add callback for COB data updates + self.enhanced_websocket.add_cob_callback(self._on_enhanced_cob_update) + + # Start the WebSocket connection + await self.enhanced_websocket.start() + + logger.info("Enhanced WebSocket initialized and started successfully") except Exception as e: logger.error(f" Error starting Enhanced WebSocket: {e}") + # Continue without WebSocket - will use API fallback # Skip COB provider backup since Enhanced WebSocket is working perfectly logger.info("Skipping COB provider backup - Enhanced WebSocket provides all needed data") @@ -118,7 +132,23 @@ class COBIntegration: async def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict): """Handle COB updates from Enhanced WebSocket""" try: - logger.debug(f"Enhanced WebSocket COB update for {symbol}") + logger.debug(f"Enhanced WebSocket COB update for {symbol}: {cob_data.get('type', 'unknown')}") + + # Handle candlestick data - convert to OHLCV and update data provider + if cob_data.get('type') == 'candlestick' and self.data_provider: + candlestick = cob_data.get('data', {}) + if candlestick: + # Convert WebSocket candlestick to tick format for data provider + tick = { + 'timestamp': datetime.fromtimestamp(candlestick.get('close_time', 0) / 1000, tz=timezone.utc), + 'price': float(candlestick.get('close_price', 0)), + 'volume': float(candlestick.get('volume', 0)) + } + + # Update data provider with live tick (this will update real_time_data) + if hasattr(self.data_provider, '_process_tick'): + self.data_provider._process_tick(symbol, tick) + logger.debug(f"Updated data provider with live candle: {symbol} @ {tick['price']}") # Convert enhanced WebSocket data to COB format for existing callbacks # Notify CNN callbacks diff --git a/core/data_provider.py b/core/data_provider.py index ccdd74d..08fdd0d 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -3775,10 +3775,22 @@ class DataProvider: logger.error(f"Error emitting pivot event: {e}", exc_info=True) def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame: - """Get the latest candles from cached data only""" + """Get the latest candles combining cached data with real-time data""" try: - # Get cached data - cached_df = self.get_historical_data(symbol, timeframe, limit=limit) + # Check for real-time data first + has_real_time_data = False + with self.data_lock: + if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]: + real_time_candles = list(self.real_time_data[symbol][timeframe]) + has_real_time_data = bool(real_time_candles) + + # If no real-time data available, force refresh from API for live updates + if not has_real_time_data and limit <= 10: # Small limit suggests live update request + logger.debug(f"No real-time data for {symbol} {timeframe}, forcing API refresh for live update") + cached_df = self.get_historical_data(symbol, timeframe, limit=limit, refresh=True) + else: + # Get cached data normally + cached_df = self.get_historical_data(symbol, timeframe, limit=limit) # Get real-time data if available with self.data_lock: @@ -3786,24 +3798,29 @@ class DataProvider: real_time_candles = list(self.real_time_data[symbol][timeframe]) if real_time_candles: - # Convert to DataFrame + # Convert to DataFrame and ensure proper format rt_df = pd.DataFrame(real_time_candles) + rt_df = self._ensure_datetime_index(rt_df) if cached_df is not None and not cached_df.empty: # Combine cached and real-time # Remove overlapping candles from cached data if not rt_df.empty: - cutoff_time = rt_df['timestamp'].min() + cutoff_time = rt_df.index.min() cached_df = cached_df[cached_df.index < cutoff_time] - # Concatenate - combined_df = pd.concat([cached_df, rt_df], ignore_index=True) + # Concatenate and sort by index + combined_df = pd.concat([cached_df, rt_df]) + combined_df = combined_df.sort_index() + combined_df = combined_df[~combined_df.index.duplicated(keep='last')] else: combined_df = rt_df + logger.debug(f"Combined data for {symbol} {timeframe}: {len(cached_df) if cached_df is not None else 0} cached + {len(rt_df)} real-time") return combined_df.tail(limit) # Return just cached data if no real-time data + logger.debug(f"Returning cached data only for {symbol} {timeframe}: {len(cached_df) if cached_df is not None else 0} candles") return cached_df.tail(limit) if cached_df is not None else pd.DataFrame() except Exception as e: