diff --git a/ANNOTATE/WEBSOCKET_LIVE_UPDATES.md b/ANNOTATE/WEBSOCKET_LIVE_UPDATES.md new file mode 100644 index 0000000..e69de29 diff --git a/ANNOTATE/web/app.py b/ANNOTATE/web/app.py index ec3c561..ea1e8cb 100644 --- a/ANNOTATE/web/app.py +++ b/ANNOTATE/web/app.py @@ -131,6 +131,23 @@ class AnnotationDashboard: static_folder='static' ) + # Initialize SocketIO for WebSocket support + try: + from flask_socketio import SocketIO, emit + self.socketio = SocketIO( + self.server, + cors_allowed_origins="*", + async_mode='threading', + logger=False, + engineio_logger=False + ) + self.has_socketio = True + logger.info("✅ SocketIO initialized for real-time updates") + except ImportError: + self.socketio = None + self.has_socketio = False + logger.warning("⚠️ flask-socketio not installed - live updates will use polling") + # Suppress werkzeug request logs (reduce noise from polling endpoints) werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.WARNING) # Only show warnings and errors, not INFO @@ -556,8 +573,13 @@ class AnnotationDashboard: def index(): """Main dashboard page - loads existing annotations""" try: - # Get all existing annotations - annotations = self.annotation_manager.get_annotations() + # Get symbols and timeframes from config + symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) + timeframes = self.config.get('timeframes', ['1s', '1m', '1h', '1d']) + current_symbol = symbols[0] if symbols else 'ETH/USDT' + + # Get annotations filtered by current symbol + annotations = self.annotation_manager.get_annotations(symbol=current_symbol) # Convert to serializable format annotations_data = [] @@ -580,15 +602,11 @@ class AnnotationDashboard: 'created_at': ann_dict.get('created_at') }) - logger.info(f"Loading dashboard with {len(annotations_data)} existing annotations") - - # Get symbols and timeframes from config - symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) - timeframes = self.config.get('timeframes', ['1s', '1m', '1h', '1d']) + logger.info(f"Loading dashboard with {len(annotations_data)} annotations for {current_symbol}") # Prepare template data template_data = { - 'current_symbol': symbols[0] if symbols else 'ETH/USDT', # Use first symbol as default + 'current_symbol': current_symbol, 'symbols': symbols, 'timeframes': timeframes, 'annotations': annotations_data @@ -1112,6 +1130,52 @@ class AnnotationDashboard: } }) + @self.server.route('/api/get-annotations', methods=['POST']) + def get_annotations_api(): + """Get annotations filtered by symbol""" + try: + data = request.get_json() + symbol = data.get('symbol', 'ETH/USDT') + + # Get annotations for this symbol + annotations = self.annotation_manager.get_annotations(symbol=symbol) + + # Convert to serializable format + annotations_data = [] + for ann in annotations: + if hasattr(ann, '__dict__'): + ann_dict = ann.__dict__ + else: + ann_dict = ann + + annotations_data.append({ + 'annotation_id': ann_dict.get('annotation_id'), + 'symbol': ann_dict.get('symbol'), + 'timeframe': ann_dict.get('timeframe'), + 'entry': ann_dict.get('entry'), + 'exit': ann_dict.get('exit'), + 'direction': ann_dict.get('direction'), + 'profit_loss_pct': ann_dict.get('profit_loss_pct'), + 'notes': ann_dict.get('notes', ''), + 'created_at': ann_dict.get('created_at') + }) + + logger.info(f"Returning {len(annotations_data)} annotations for {symbol}") + + return jsonify({ + 'success': True, + 'annotations': annotations_data, + 'symbol': symbol, + 'count': len(annotations_data) + }) + + except Exception as e: + logger.error(f"Error getting annotations: {e}") + return jsonify({ + 'success': False, + 'error': str(e) + }) + @self.server.route('/api/export-annotations', methods=['POST']) def export_annotations(): """Export annotations to file""" @@ -1158,17 +1222,20 @@ class AnnotationDashboard: model_name = data['model_name'] annotation_ids = data.get('annotation_ids', []) - # If no specific annotations provided, use all + # CRITICAL: Get current symbol to filter annotations + current_symbol = data.get('symbol', 'ETH/USDT') + + # If no specific annotations provided, use all for current symbol if not annotation_ids: - annotations = self.annotation_manager.get_annotations() + annotations = self.annotation_manager.get_annotations(symbol=current_symbol) annotation_ids = [ a.annotation_id if hasattr(a, 'annotation_id') else a.get('annotation_id') for a in annotations ] + logger.info(f"Using all {len(annotation_ids)} annotations for {current_symbol}") # Load test cases from disk (they were auto-generated when annotations were saved) - # CRITICAL: Filter by current symbol to avoid cross-symbol training - current_symbol = data.get('symbol', 'ETH/USDT') + # Filter by current symbol to avoid cross-symbol training all_test_cases = self.annotation_manager.get_all_test_cases(symbol=current_symbol) # Filter to selected annotations @@ -1566,11 +1633,160 @@ class AnnotationDashboard: 'message': str(e) } }) + + # WebSocket event handlers (if SocketIO is available) + if self.has_socketio: + self._setup_websocket_handlers() + + def _setup_websocket_handlers(self): + """Setup WebSocket event handlers for real-time updates""" + if not self.has_socketio: + return + + @self.socketio.on('connect') + def handle_connect(): + """Handle client connection""" + logger.info(f"WebSocket client connected") + from flask_socketio import emit + emit('connection_response', {'status': 'connected', 'message': 'Connected to ANNOTATE live updates'}) + + @self.socketio.on('disconnect') + def handle_disconnect(): + """Handle client disconnection""" + logger.info(f"WebSocket client disconnected") + + @self.socketio.on('subscribe_live_updates') + def handle_subscribe(data): + """Subscribe to live chart and prediction updates""" + from flask_socketio import emit, join_room + symbol = data.get('symbol', 'ETH/USDT') + timeframe = data.get('timeframe', '1s') + room = f"{symbol}_{timeframe}" + + join_room(room) + logger.info(f"Client subscribed to live updates: {room}") + emit('subscription_confirmed', {'room': room, 'symbol': symbol, 'timeframe': timeframe}) + + # Start live update thread if not already running + if not hasattr(self, '_live_update_thread') or not self._live_update_thread.is_alive(): + self._start_live_update_thread() + + @self.socketio.on('request_prediction') + def handle_prediction_request(data): + """Handle manual prediction request""" + from flask_socketio import emit + try: + symbol = data.get('symbol', 'ETH/USDT') + timeframe = data.get('timeframe', '1s') + prediction_steps = data.get('prediction_steps', 1) + + # Get prediction from model + prediction = self._get_live_prediction(symbol, timeframe, prediction_steps) + + emit('prediction_update', prediction) + except Exception as e: + logger.error(f"Error handling prediction request: {e}") + emit('prediction_error', {'error': str(e)}) + + def _start_live_update_thread(self): + """Start background thread for live updates""" + import threading + + def live_update_worker(): + """Background worker for live updates""" + import time + from flask_socketio import emit + + logger.info("Live update thread started") + + while True: + try: + # Get active rooms (symbol_timeframe combinations) + # For now, update all subscribed clients every second + + # Get latest chart data + if self.data_provider: + for symbol in ['ETH/USDT', 'BTC/USDT']: # TODO: Get from active subscriptions + for timeframe in ['1s', '1m']: + room = f"{symbol}_{timeframe}" + + # Get latest candle + try: + candles = self.data_provider.get_ohlcv(symbol, timeframe, limit=1) + if candles and len(candles) > 0: + latest_candle = candles[-1] + + # Emit chart update + self.socketio.emit('chart_update', { + 'symbol': symbol, + 'timeframe': timeframe, + 'candle': { + 'timestamp': latest_candle.get('timestamp'), + 'open': latest_candle.get('open'), + 'high': latest_candle.get('high'), + 'low': latest_candle.get('low'), + 'close': latest_candle.get('close'), + 'volume': latest_candle.get('volume') + } + }, room=room) + + # Get prediction if model is loaded + if self.orchestrator and hasattr(self.orchestrator, 'primary_transformer'): + prediction = self._get_live_prediction(symbol, timeframe, 1) + if prediction: + self.socketio.emit('prediction_update', prediction, room=room) + + except Exception as e: + logger.debug(f"Error getting data for {symbol} {timeframe}: {e}") + + time.sleep(1) # Update every second + + except Exception as e: + logger.error(f"Error in live update thread: {e}") + time.sleep(5) # Wait longer on error + + self._live_update_thread = threading.Thread(target=live_update_worker, daemon=True) + self._live_update_thread.start() + + def _get_live_prediction(self, symbol: str, timeframe: str, prediction_steps: int = 1): + """Get live prediction from model""" + try: + if not self.orchestrator or not hasattr(self.orchestrator, 'primary_transformer'): + return None + + # Get recent candles for prediction + candles = self.data_provider.get_ohlcv(symbol, timeframe, limit=200) + if not candles or len(candles) < 200: + return None + + # TODO: Implement actual prediction logic + # For now, return placeholder + import random + + return { + 'symbol': symbol, + 'timeframe': timeframe, + 'timestamp': datetime.now().isoformat(), + 'action': random.choice(['BUY', 'SELL', 'HOLD']), + 'confidence': random.uniform(0.6, 0.95), + 'predicted_price': candles[-1].get('close', 0) * (1 + random.uniform(-0.01, 0.01)), + 'prediction_steps': prediction_steps + } + + except Exception as e: + logger.error(f"Error getting live prediction: {e}") + return None def run(self, host='127.0.0.1', port=8051, debug=False): """Run the application""" logger.info(f"Starting Annotation Dashboard on http://{host}:{port}") - self.server.run(host=host, port=port, debug=debug) + + if self.has_socketio: + logger.info("✅ Running with WebSocket support (SocketIO)") + self.socketio.run(self.server, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True) + else: + logger.warning("⚠️ Running without WebSocket support - install flask-socketio for live updates") + self.server.run(host=host, port=port, debug=debug) def main(): diff --git a/ANNOTATE/web/static/js/live_updates_ws.js b/ANNOTATE/web/static/js/live_updates_ws.js new file mode 100644 index 0000000..f2a4468 --- /dev/null +++ b/ANNOTATE/web/static/js/live_updates_ws.js @@ -0,0 +1,243 @@ +/** + * WebSocket-based Live Updates for ANNOTATE + * Provides real-time chart updates and model predictions + */ + +class LiveUpdatesWebSocket { + constructor() { + this.socket = null; + this.connected = false; + this.reconnectAttempts = 0; + this.maxReconnectAttempts = 5; + this.reconnectDelay = 1000; // Start with 1 second + this.subscriptions = new Set(); + + // Callbacks + this.onChartUpdate = null; + this.onPredictionUpdate = null; + this.onConnectionChange = null; + + console.log('LiveUpdatesWebSocket initialized'); + } + + connect() { + if (this.connected) { + console.log('Already connected to WebSocket'); + return; + } + + try { + // Initialize SocketIO connection + this.socket = io({ + transports: ['websocket', 'polling'], + upgrade: true, + rememberUpgrade: true + }); + + this._setupEventHandlers(); + console.log('Connecting to WebSocket...'); + + } catch (error) { + console.error('Failed to initialize WebSocket:', error); + this._scheduleReconnect(); + } + } + + _setupEventHandlers() { + // Connection events + this.socket.on('connect', () => { + console.log('✅ WebSocket connected'); + this.connected = true; + this.reconnectAttempts = 0; + this.reconnectDelay = 1000; + + if (this.onConnectionChange) { + this.onConnectionChange(true); + } + + // Resubscribe to previous subscriptions + this.subscriptions.forEach(sub => { + this._subscribe(sub.symbol, sub.timeframe); + }); + }); + + this.socket.on('disconnect', () => { + console.log('❌ WebSocket disconnected'); + this.connected = false; + + if (this.onConnectionChange) { + this.onConnectionChange(false); + } + + this._scheduleReconnect(); + }); + + this.socket.on('connection_response', (data) => { + console.log('Connection response:', data); + }); + + this.socket.on('subscription_confirmed', (data) => { + console.log('Subscription confirmed:', data); + }); + + // Data events + this.socket.on('chart_update', (data) => { + console.debug('Chart update received:', data); + if (this.onChartUpdate) { + this.onChartUpdate(data); + } + }); + + this.socket.on('prediction_update', (data) => { + console.debug('Prediction update received:', data); + if (this.onPredictionUpdate) { + this.onPredictionUpdate(data); + } + }); + + this.socket.on('prediction_error', (data) => { + console.error('Prediction error:', data); + }); + + // Error events + this.socket.on('connect_error', (error) => { + console.error('WebSocket connection error:', error); + this._scheduleReconnect(); + }); + + this.socket.on('error', (error) => { + console.error('WebSocket error:', error); + }); + } + + _scheduleReconnect() { + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + console.error('Max reconnection attempts reached. Please refresh the page.'); + return; + } + + this.reconnectAttempts++; + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); // Exponential backoff + + console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})...`); + + setTimeout(() => { + if (!this.connected) { + this.connect(); + } + }, delay); + } + + subscribe(symbol, timeframe) { + this.subscriptions.add({ symbol, timeframe }); + + if (this.connected) { + this._subscribe(symbol, timeframe); + } + } + + _subscribe(symbol, timeframe) { + if (!this.socket || !this.connected) { + console.warn('Cannot subscribe - not connected'); + return; + } + + console.log(`Subscribing to live updates: ${symbol} ${timeframe}`); + this.socket.emit('subscribe_live_updates', { + symbol: symbol, + timeframe: timeframe + }); + } + + requestPrediction(symbol, timeframe, predictionSteps = 1) { + if (!this.socket || !this.connected) { + console.warn('Cannot request prediction - not connected'); + return; + } + + console.log(`Requesting prediction: ${symbol} ${timeframe} (${predictionSteps} steps)`); + this.socket.emit('request_prediction', { + symbol: symbol, + timeframe: timeframe, + prediction_steps: predictionSteps + }); + } + + disconnect() { + if (this.socket) { + console.log('Disconnecting WebSocket...'); + this.socket.disconnect(); + this.socket = null; + this.connected = false; + this.subscriptions.clear(); + } + } + + isConnected() { + return this.connected; + } +} + +// Global instance +window.liveUpdatesWS = null; + +// Initialize on page load +document.addEventListener('DOMContentLoaded', function() { + // Check if SocketIO is available + if (typeof io === 'undefined') { + console.warn('⚠️ Socket.IO not loaded - live updates will not work'); + console.warn('Add to your HTML'); + return; + } + + // Initialize WebSocket + window.liveUpdatesWS = new LiveUpdatesWebSocket(); + + // Setup callbacks + window.liveUpdatesWS.onConnectionChange = function(connected) { + const statusElement = document.getElementById('ws-connection-status'); + if (statusElement) { + if (connected) { + statusElement.innerHTML = '🟢 Live'; + } else { + statusElement.innerHTML = '🔴 Disconnected'; + } + } + }; + + window.liveUpdatesWS.onChartUpdate = function(data) { + // Update chart with new candle + if (window.appState && window.appState.chartManager) { + window.appState.chartManager.updateLatestCandle(data.symbol, data.timeframe, data.candle); + } + }; + + window.liveUpdatesWS.onPredictionUpdate = function(data) { + // Update prediction display + if (typeof updatePredictionDisplay === 'function') { + updatePredictionDisplay(data); + } + + // Add to prediction history + if (typeof predictionHistory !== 'undefined') { + predictionHistory.unshift(data); + if (predictionHistory.length > 5) { + predictionHistory = predictionHistory.slice(0, 5); + } + if (typeof updatePredictionHistory === 'function') { + updatePredictionHistory(); + } + } + }; + + // Auto-connect + console.log('Auto-connecting to WebSocket...'); + window.liveUpdatesWS.connect(); +}); + +// Cleanup on page unload +window.addEventListener('beforeunload', function() { + if (window.liveUpdatesWS) { + window.liveUpdatesWS.disconnect(); + } +}); diff --git a/ANNOTATE/web/templates/base_layout.html b/ANNOTATE/web/templates/base_layout.html index 99e608c..fe9d662 100644 --- a/ANNOTATE/web/templates/base_layout.html +++ b/ANNOTATE/web/templates/base_layout.html @@ -36,6 +36,9 @@ 0 Annotations + + ⚪ Connecting... + --:--:-- @@ -74,11 +77,15 @@ + + + + {% block extra_js %}{% endblock %} diff --git a/ANNOTATE/web/templates/components/control_panel.html b/ANNOTATE/web/templates/components/control_panel.html index 5f6cc0d..40f87fb 100644 --- a/ANNOTATE/web/templates/components/control_panel.html +++ b/ANNOTATE/web/templates/components/control_panel.html @@ -118,9 +118,52 @@ // Symbol selection document.getElementById('symbol-select').addEventListener('change', function(e) { appState.currentSymbol = e.target.value; + + // Reload annotations for new symbol + reloadAnnotationsForSymbol(appState.currentSymbol); + + // Reload chart data loadInitialData(); }); + // Function to reload annotations when symbol changes + function reloadAnnotationsForSymbol(symbol) { + fetch('/api/get-annotations', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ symbol: symbol }) + }) + .then(response => response.json()) + .then(data => { + if (data.success) { + // Update app state with filtered annotations + appState.annotations = data.annotations; + + // Clear existing annotations from chart + if (appState.chartManager) { + appState.chartManager.clearAllAnnotations(); + + // Add new annotations to chart + data.annotations.forEach(annotation => { + appState.chartManager.addAnnotation(annotation); + }); + } + + // Update annotation list UI + if (typeof renderAnnotationsList === 'function') { + renderAnnotationsList(appState.annotations); + } + + console.log(`Loaded ${data.count} annotations for ${symbol}`); + } else { + console.error('Failed to load annotations:', data.error); + } + }) + .catch(error => { + console.error('Error loading annotations:', error); + }); + } + // Timeframe checkboxes document.querySelectorAll('.form-check-input[id^="tf-"]').forEach(checkbox => { checkbox.addEventListener('change', function() { diff --git a/ANNOTATE/web/templates/components/training_panel.html b/ANNOTATE/web/templates/components/training_panel.html index d59acd9..cb69944 100644 --- a/ANNOTATE/web/templates/components/training_panel.html +++ b/ANNOTATE/web/templates/components/training_panel.html @@ -376,7 +376,8 @@ headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model_name: modelName, - annotation_ids: annotationIds + annotation_ids: annotationIds, + symbol: appState.currentSymbol // CRITICAL: Filter by current symbol }) }) .then(response => response.json()) @@ -568,6 +569,38 @@ }); }); + function updatePredictionHistory() { + const historyDiv = document.getElementById('prediction-history'); + if (predictionHistory.length === 0) { + historyDiv.innerHTML = '
No predictions yet...
'; + return; + } + + // Display last 5 predictions (most recent first) + const html = predictionHistory.slice(0, 5).map(pred => { + const time = new Date(pred.timestamp).toLocaleTimeString(); + const actionColor = pred.action === 'BUY' ? 'text-success' : + pred.action === 'SELL' ? 'text-danger' : 'text-secondary'; + const confidence = (pred.confidence * 100).toFixed(1); + const price = pred.predicted_price ? pred.predicted_price.toFixed(2) : '--'; + + return ` +
+
+ ${pred.action} + ${time} +
+
+
${confidence}%
+
$${price}
+
+
+ `; + }).join(''); + + historyDiv.innerHTML = html; + } + function startSignalPolling() { signalPollInterval = setInterval(function () { // Poll for signals @@ -580,6 +613,18 @@ document.getElementById('latest-confidence').textContent = (latest.confidence * 100).toFixed(1) + '%'; + // Add to prediction history (keep last 5) + predictionHistory.unshift({ + timestamp: latest.timestamp || new Date().toISOString(), + action: latest.action, + confidence: latest.confidence, + predicted_price: latest.predicted_price + }); + if (predictionHistory.length > 5) { + predictionHistory = predictionHistory.slice(0, 5); + } + updatePredictionHistory(); + // Update chart with signal markers if (appState.chartManager) { displaySignalOnChart(latest);