""" Manual Trade Annotation UI - Main Application A web-based interface for manually marking profitable buy/sell signals on historical market data to generate training test cases for machine learning models. """ import os import sys from pathlib import Path # Add parent directory to path for imports parent_dir = Path(__file__).parent.parent.parent sys.path.insert(0, str(parent_dir)) from flask import Flask, render_template, request, jsonify, send_file from dash import Dash, html import logging from datetime import datetime import json import pandas as pd # Import core components from main system try: from core.data_provider import DataProvider from core.orchestrator import TradingOrchestrator from core.config import get_config except ImportError as e: print(f"Warning: Could not import main system components: {e}") print("Running in standalone mode with limited functionality") DataProvider = None TradingOrchestrator = None get_config = lambda: {} # Import ANNOTATE modules annotate_dir = Path(__file__).parent.parent sys.path.insert(0, str(annotate_dir)) try: from core.annotation_manager import AnnotationManager from core.real_training_adapter import RealTrainingAdapter from core.data_loader import HistoricalDataLoader, TimeRangeManager except ImportError: # Try alternative import path import importlib.util # Load annotation_manager ann_spec = importlib.util.spec_from_file_location( "annotation_manager", annotate_dir / "core" / "annotation_manager.py" ) ann_module = importlib.util.module_from_spec(ann_spec) ann_spec.loader.exec_module(ann_module) AnnotationManager = ann_module.AnnotationManager # Load real_training_adapter (NO SIMULATION!) train_spec = importlib.util.spec_from_file_location( "real_training_adapter", annotate_dir / "core" / "real_training_adapter.py" ) train_module = importlib.util.module_from_spec(train_spec) train_spec.loader.exec_module(train_module) RealTrainingAdapter = train_module.RealTrainingAdapter # Load data_loader data_spec = importlib.util.spec_from_file_location( "data_loader", annotate_dir / "core" / "data_loader.py" ) data_module = importlib.util.module_from_spec(data_spec) data_spec.loader.exec_module(data_module) HistoricalDataLoader = data_module.HistoricalDataLoader TimeRangeManager = data_module.TimeRangeManager # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AnnotationDashboard: """Main annotation dashboard application""" def __init__(self): """Initialize the dashboard""" # Load configuration try: # Always try YAML loading first since get_config might not work in standalone mode import yaml with open('config.yaml', 'r') as f: self.config = yaml.safe_load(f) logger.info(f"Loaded config via YAML: {len(self.config)} keys") except Exception as e: logger.warning(f"Could not load config via YAML: {e}") try: # Fallback to get_config if available if get_config: self.config = get_config() logger.info(f"Loaded config via get_config: {len(self.config)} keys") else: raise Exception("get_config not available") except Exception as e2: logger.warning(f"Could not load config via get_config: {e2}") # Final fallback config with SOL/USDT self.config = { 'symbols': ['ETH/USDT', 'BTC/USDT', 'SOL/USDT'], 'timeframes': ['1s', '1m', '1h', '1d'] } logger.info("Using fallback config") # Initialize Flask app self.server = Flask( __name__, template_folder='templates', static_folder='static' ) # Initialize Dash app (optional component) self.app = Dash( __name__, server=self.server, url_base_pathname='/dash/', external_stylesheets=[ 'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css', 'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css' ] ) # Set a simple Dash layout to avoid NoLayoutException self.app.layout = html.Div([ html.H1("ANNOTATE Dashboard", className="text-center mb-4"), html.Div([ html.P("This is the Dash component of the ANNOTATE system."), html.P("The main interface is available at the Flask routes."), html.A("Go to Main Interface", href="/", className="btn btn-primary") ], className="container") ]) # Initialize core components self.data_provider = DataProvider() if DataProvider else None # Enable unified storage for real-time data access if self.data_provider: self._enable_unified_storage_async() self.orchestrator = TradingOrchestrator( data_provider=self.data_provider ) if TradingOrchestrator and self.data_provider else None # Initialize ANNOTATE components self.annotation_manager = AnnotationManager() # Use REAL training adapter - NO SIMULATION! self.training_adapter = RealTrainingAdapter(self.orchestrator, self.data_provider) # Initialize data loader with existing DataProvider self.data_loader = HistoricalDataLoader(self.data_provider) if self.data_provider else None self.time_range_manager = TimeRangeManager(self.data_loader) if self.data_loader else None # Setup routes self._setup_routes() logger.info("Annotation Dashboard initialized") def _enable_unified_storage_async(self): """Enable unified storage system in background thread""" def enable_storage(): try: import asyncio import threading loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Enable unified storage success = loop.run_until_complete( self.data_provider.enable_unified_storage() ) if success: logger.info("✅ ANNOTATE: Unified storage enabled for real-time data") # Get statistics stats = self.data_provider.get_unified_storage_stats() if stats.get('initialized'): logger.info(" Real-time data access: <10ms") logger.info(" Historical data access: <100ms") logger.info(" Annotation data: Available at any timestamp") else: logger.warning("⚠️ ANNOTATE: Unified storage not available, using cached data only") except Exception as e: logger.warning(f"ANNOTATE: Could not enable unified storage: {e}") logger.info("ANNOTATE: Continuing with cached data access") # Start in background thread import threading storage_thread = threading.Thread(target=enable_storage, daemon=True) storage_thread.start() def _get_pivot_markers_for_timeframe(self, symbol: str, timeframe: str, df: pd.DataFrame) -> dict: """ Get pivot markers for a specific timeframe Returns dict with indices mapping to pivot info for that candle """ try: if not self.data_provider: return {} # Get Williams pivot levels for this timeframe pivot_levels = self.data_provider.get_williams_pivot_levels( symbol=symbol, base_timeframe=timeframe, limit=len(df) ) if not pivot_levels: return {} # Build a map of timestamp -> pivot info pivot_map = {} # For each level (1-5), find the last high and last low pivot for level_num, trend_level in pivot_levels.items(): if not hasattr(trend_level, 'pivot_points') or not trend_level.pivot_points: continue # Find last high and last low for this level last_high = None last_low = None for pivot in trend_level.pivot_points: if pivot.pivot_type == 'high': last_high = pivot elif pivot.pivot_type == 'low': last_low = pivot # Add to pivot map if last_high: ts_str = last_high.timestamp.strftime('%Y-%m-%d %H:%M:%S') if ts_str not in pivot_map: pivot_map[ts_str] = {'highs': [], 'lows': []} pivot_map[ts_str]['highs'].append({ 'level': level_num, 'price': last_high.price, 'strength': last_high.strength }) if last_low: ts_str = last_low.timestamp.strftime('%Y-%m-%d %H:%M:%S') if ts_str not in pivot_map: pivot_map[ts_str] = {'highs': [], 'lows': []} pivot_map[ts_str]['lows'].append({ 'level': level_num, 'price': last_low.price, 'strength': last_low.strength }) return pivot_map except Exception as e: logger.error(f"Error getting pivot markers for {timeframe}: {e}") return {} def _setup_routes(self): """Setup Flask routes""" @self.server.route('/favicon.ico') def favicon(): """Serve favicon to prevent 404 errors""" from flask import Response # Return a simple 1x1 transparent pixel as favicon favicon_data = b'\x00\x00\x01\x00\x01\x00\x10\x10\x00\x00\x01\x00\x20\x00\x68\x04\x00\x00\x16\x00\x00\x00\x28\x00\x00\x00\x10\x00\x00\x00\x20\x00\x00\x00\x01\x00\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' return Response(favicon_data, mimetype='image/x-icon') @self.server.route('/') def index(): """Main dashboard page - loads existing annotations""" try: # Get all existing annotations annotations = self.annotation_manager.get_annotations() # Convert to serializable format annotations_data = [] for ann in annotations: if hasattr(ann, '__dict__'): ann_dict = ann.__dict__ else: ann_dict = ann # Ensure all fields are JSON serializable annotations_data.append({ 'annotation_id': ann_dict.get('annotation_id'), 'symbol': ann_dict.get('symbol'), 'timeframe': ann_dict.get('timeframe'), 'entry': ann_dict.get('entry'), 'exit': ann_dict.get('exit'), 'direction': ann_dict.get('direction'), 'profit_loss_pct': ann_dict.get('profit_loss_pct'), 'notes': ann_dict.get('notes', ''), 'created_at': ann_dict.get('created_at') }) logger.info(f"Loading dashboard with {len(annotations_data)} existing annotations") # Get symbols and timeframes from config symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) timeframes = self.config.get('timeframes', ['1s', '1m', '1h', '1d']) # Prepare template data template_data = { 'current_symbol': symbols[0] if symbols else 'ETH/USDT', # Use first symbol as default 'symbols': symbols, 'timeframes': timeframes, 'annotations': annotations_data } return render_template('annotation_dashboard.html', **template_data) except Exception as e: logger.error(f"Error rendering main page: {e}") # Fallback simple HTML page return f""" ANNOTATE - Manual Trade Annotation UI

📝 ANNOTATE - Manual Trade Annotation UI

System Status

✅ Annotation Manager: Active

⚠️ Data Provider: {'Available' if self.data_provider else 'Not Available (Standalone Mode)'}

⚠️ Trading Orchestrator: {'Available' if self.orchestrator else 'Not Available (Standalone Mode)'}

Available Features

  • Manual trade annotation
  • Test case generation
  • Annotation export
  • Real model training

API Endpoints

  • POST /api/chart-data - Get chart data
  • POST /api/save-annotation - Save annotation
  • POST /api/delete-annotation - Delete annotation
  • POST /api/generate-test-case - Generate test case
  • POST /api/export-annotations - Export annotations
Go to Dash Interface
""" @self.server.route('/api/chart-data', methods=['POST']) def get_chart_data(): """Get chart data for specified symbol and timeframes""" try: data = request.get_json() symbol = data.get('symbol', 'ETH/USDT') timeframes = data.get('timeframes', ['1s', '1m', '1h', '1d']) start_time_str = data.get('start_time') end_time_str = data.get('end_time') if not self.data_loader: return jsonify({ 'success': False, 'error': { 'code': 'DATA_LOADER_UNAVAILABLE', 'message': 'Data loader not available' } }) # Parse time strings if provided start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00')) if start_time_str else None end_time = datetime.fromisoformat(end_time_str.replace('Z', '+00:00')) if end_time_str else None # Fetch data for each timeframe using data loader chart_data = {} for timeframe in timeframes: df = self.data_loader.get_data( symbol=symbol, timeframe=timeframe, start_time=start_time, end_time=end_time, limit=500 ) if df is not None and not df.empty: # Get pivot points for this timeframe pivot_markers = self._get_pivot_markers_for_timeframe(symbol, timeframe, df) # Convert to format suitable for Plotly chart_data[timeframe] = { 'timestamps': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(), 'open': df['open'].tolist(), 'high': df['high'].tolist(), 'low': df['low'].tolist(), 'close': df['close'].tolist(), 'volume': df['volume'].tolist(), 'pivot_markers': pivot_markers # Optional: only present if pivots exist } # Get pivot bounds for the symbol pivot_bounds = None if self.data_provider: try: pivot_bounds = self.data_provider.get_pivot_bounds(symbol) if pivot_bounds: logger.info(f"Found pivot bounds for {symbol}: {len(pivot_bounds.pivot_support_levels)} support, {len(pivot_bounds.pivot_resistance_levels)} resistance") except Exception as e: logger.error(f"Error getting pivot bounds: {e}") return jsonify({ 'success': True, 'chart_data': chart_data, 'pivot_bounds': { 'support_levels': pivot_bounds.pivot_support_levels if pivot_bounds else [], 'resistance_levels': pivot_bounds.pivot_resistance_levels if pivot_bounds else [], 'price_range': { 'min': pivot_bounds.price_min if pivot_bounds else None, 'max': pivot_bounds.price_max if pivot_bounds else None }, 'volume_range': { 'min': pivot_bounds.volume_min if pivot_bounds else None, 'max': pivot_bounds.volume_max if pivot_bounds else None }, 'timeframe': '1m', # Pivot bounds are calculated from 1m data 'period': '30 days', # Monthly data 'total_levels': len(pivot_bounds.pivot_support_levels) + len(pivot_bounds.pivot_resistance_levels) if pivot_bounds else 0 } if pivot_bounds else None }) except Exception as e: logger.error(f"Error fetching chart data: {e}") return jsonify({ 'success': False, 'error': { 'code': 'CHART_DATA_ERROR', 'message': str(e) } }) @self.server.route('/api/save-annotation', methods=['POST']) def save_annotation(): """Save a new annotation with full market context""" try: data = request.get_json() # Capture market state at entry and exit times using data provider entry_market_state = {} exit_market_state = {} if self.data_provider: try: # Parse timestamps entry_time = datetime.fromisoformat(data['entry']['timestamp'].replace('Z', '+00:00')) exit_time = datetime.fromisoformat(data['exit']['timestamp'].replace('Z', '+00:00')) # Use the new data provider method to get market state at entry time entry_market_state = self.data_provider.get_market_state_at_time( symbol=data['symbol'], timestamp=entry_time, context_window_minutes=5 ) # Use the new data provider method to get market state at exit time exit_market_state = self.data_provider.get_market_state_at_time( symbol=data['symbol'], timestamp=exit_time, context_window_minutes=5 ) logger.info(f"Captured market state: {len(entry_market_state)} timeframes at entry, {len(exit_market_state)} at exit") except Exception as e: logger.error(f"Error capturing market state: {e}") import traceback traceback.print_exc() # Create annotation with market context annotation = self.annotation_manager.create_annotation( entry_point=data['entry'], exit_point=data['exit'], symbol=data['symbol'], timeframe=data['timeframe'], entry_market_state=entry_market_state, exit_market_state=exit_market_state ) # Save annotation self.annotation_manager.save_annotation(annotation) # Automatically generate test case with ±5min data try: test_case = self.annotation_manager.generate_test_case( annotation, data_provider=self.data_provider, auto_save=True ) # Log test case details market_state = test_case.get('market_state', {}) timeframes_with_data = [k for k in market_state.keys() if k.startswith('ohlcv_')] logger.info(f"Auto-generated test case: {test_case['test_case_id']}") logger.info(f" Timeframes: {timeframes_with_data}") for tf_key in timeframes_with_data: candle_count = len(market_state[tf_key].get('timestamps', [])) logger.info(f" {tf_key}: {candle_count} candles") if 'training_labels' in market_state: logger.info(f" Training labels: {len(market_state['training_labels'].get('labels_1m', []))} labels") except Exception as e: logger.error(f"Failed to auto-generate test case: {e}") import traceback traceback.print_exc() return jsonify({ 'success': True, 'annotation': annotation.__dict__ if hasattr(annotation, '__dict__') else annotation }) except Exception as e: logger.error(f"Error saving annotation: {e}") return jsonify({ 'success': False, 'error': { 'code': 'SAVE_ANNOTATION_ERROR', 'message': str(e) } }) @self.server.route('/api/delete-annotation', methods=['POST']) def delete_annotation(): """Delete an annotation""" try: data = request.get_json() annotation_id = data['annotation_id'] # Delete annotation and check if it was found deleted = self.annotation_manager.delete_annotation(annotation_id) if deleted: return jsonify({ 'success': True, 'message': 'Annotation deleted successfully' }) else: return jsonify({ 'success': False, 'error': { 'code': 'ANNOTATION_NOT_FOUND', 'message': f'Annotation {annotation_id} not found' } }) except Exception as e: logger.error(f"Error deleting annotation: {e}", exc_info=True) return jsonify({ 'success': False, 'error': { 'code': 'DELETE_ANNOTATION_ERROR', 'message': str(e) } }) @self.server.route('/api/clear-all-annotations', methods=['POST']) def clear_all_annotations(): """Clear all annotations""" try: data = request.get_json() or {} symbol = data.get('symbol', None) # Use the efficient clear_all_annotations method deleted_count = self.annotation_manager.clear_all_annotations(symbol=symbol) if deleted_count == 0: return jsonify({ 'success': True, 'deleted_count': 0, 'message': 'No annotations to clear' }) logger.info(f"Cleared {deleted_count} annotations" + (f" for symbol {symbol}" if symbol else "")) return jsonify({ 'success': True, 'deleted_count': deleted_count, 'message': f'Cleared {deleted_count} annotations' }) except Exception as e: logger.error(f"Error clearing all annotations: {e}") import traceback logger.error(traceback.format_exc()) return jsonify({ 'success': False, 'error': { 'code': 'CLEAR_ALL_ANNOTATIONS_ERROR', 'message': str(e) } }) @self.server.route('/api/refresh-data', methods=['POST']) def refresh_data(): """Refresh chart data from data provider""" try: data = request.get_json() symbol = data.get('symbol', 'ETH/USDT') timeframes = data.get('timeframes', ['1s', '1m', '1h', '1d']) logger.info(f"Refreshing data for {symbol} with timeframes: {timeframes}") # Force refresh data from data provider chart_data = {} if self.data_provider: for timeframe in timeframes: try: # Force refresh by setting refresh=True df = self.data_provider.get_historical_data( symbol=symbol, timeframe=timeframe, limit=1000, refresh=True ) if df is not None and not df.empty: # Get pivot markers for this timeframe pivot_markers = self._get_pivot_markers_for_timeframe(symbol, timeframe, df) chart_data[timeframe] = { 'timestamps': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(), 'open': df['open'].tolist(), 'high': df['high'].tolist(), 'low': df['low'].tolist(), 'close': df['close'].tolist(), 'volume': df['volume'].tolist(), 'pivot_markers': pivot_markers # Optional: only present if pivots exist } logger.info(f"Refreshed {timeframe}: {len(df)} candles") else: logger.warning(f"No data available for {timeframe}") except Exception as e: logger.error(f"Error refreshing {timeframe} data: {e}") # Get pivot bounds for the symbol pivot_bounds = None if self.data_provider: try: pivot_bounds = self.data_provider.get_pivot_bounds(symbol) if pivot_bounds: logger.info(f"Found pivot bounds for {symbol}: {len(pivot_bounds.pivot_support_levels)} support, {len(pivot_bounds.pivot_resistance_levels)} resistance") except Exception as e: logger.error(f"Error getting pivot bounds: {e}") return jsonify({ 'success': True, 'chart_data': chart_data, 'pivot_bounds': { 'support_levels': pivot_bounds.pivot_support_levels if pivot_bounds else [], 'resistance_levels': pivot_bounds.pivot_resistance_levels if pivot_bounds else [], 'price_range': { 'min': pivot_bounds.price_min if pivot_bounds else None, 'max': pivot_bounds.price_max if pivot_bounds else None }, 'volume_range': { 'min': pivot_bounds.volume_min if pivot_bounds else None, 'max': pivot_bounds.volume_max if pivot_bounds else None }, 'timeframe': '1m', # Pivot bounds are calculated from 1m data 'period': '30 days', # Monthly data 'total_levels': len(pivot_bounds.pivot_support_levels) + len(pivot_bounds.pivot_resistance_levels) if pivot_bounds else 0 } if pivot_bounds else None, 'message': f'Refreshed data for {symbol}' }) except Exception as e: logger.error(f"Error refreshing data: {e}") return jsonify({ 'success': False, 'error': { 'code': 'REFRESH_DATA_ERROR', 'message': str(e) } }) @self.server.route('/api/generate-test-case', methods=['POST']) def generate_test_case(): """Generate test case from annotation""" try: data = request.get_json() annotation_id = data['annotation_id'] # Get annotation annotations = self.annotation_manager.get_annotations() annotation = next((a for a in annotations if (a.annotation_id if hasattr(a, 'annotation_id') else a.get('annotation_id')) == annotation_id), None) if not annotation: return jsonify({ 'success': False, 'error': { 'code': 'ANNOTATION_NOT_FOUND', 'message': 'Annotation not found' } }) # Generate test case with market context test_case = self.annotation_manager.generate_test_case( annotation, data_provider=self.data_provider ) return jsonify({ 'success': True, 'test_case': test_case }) except Exception as e: logger.error(f"Error generating test case: {e}") return jsonify({ 'success': False, 'error': { 'code': 'GENERATE_TESTCASE_ERROR', 'message': str(e) } }) @self.server.route('/api/export-annotations', methods=['POST']) def export_annotations(): """Export annotations to file""" try: data = request.get_json() symbol = data.get('symbol') format_type = data.get('format', 'json') # Get annotations annotations = self.annotation_manager.get_annotations(symbol=symbol) # Export to file output_path = self.annotation_manager.export_annotations( annotations=annotations, format_type=format_type ) return send_file(output_path, as_attachment=True) except Exception as e: logger.error(f"Error exporting annotations: {e}") return jsonify({ 'success': False, 'error': { 'code': 'EXPORT_ERROR', 'message': str(e) } }) @self.server.route('/api/train-model', methods=['POST']) def train_model(): """Start model training with annotations""" try: if not self.training_adapter: return jsonify({ 'success': False, 'error': { 'code': 'TRAINING_UNAVAILABLE', 'message': 'Real training adapter not available' } }) data = request.get_json() model_name = data['model_name'] annotation_ids = data.get('annotation_ids', []) # If no specific annotations provided, use all if not annotation_ids: annotations = self.annotation_manager.get_annotations() annotation_ids = [ a.annotation_id if hasattr(a, 'annotation_id') else a.get('annotation_id') for a in annotations ] # Load test cases from disk (they were auto-generated when annotations were saved) all_test_cases = self.annotation_manager.get_all_test_cases() # Filter to selected annotations test_cases = [ tc for tc in all_test_cases if tc['test_case_id'].replace('annotation_', '') in annotation_ids ] if not test_cases: return jsonify({ 'success': False, 'error': { 'code': 'NO_TEST_CASES', 'message': f'No test cases found for {len(annotation_ids)} annotations' } }) logger.info(f"Starting REAL training with {len(test_cases)} test cases for model {model_name}") # Start REAL training (NO SIMULATION!) training_id = self.training_adapter.start_training( model_name=model_name, test_cases=test_cases ) return jsonify({ 'success': True, 'training_id': training_id, 'test_cases_count': len(test_cases) }) except Exception as e: logger.error(f"Error starting training: {e}") return jsonify({ 'success': False, 'error': { 'code': 'TRAINING_ERROR', 'message': str(e) } }) @self.server.route('/api/training-progress', methods=['POST']) def get_training_progress(): """Get training progress""" try: if not self.training_adapter: return jsonify({ 'success': False, 'error': { 'code': 'TRAINING_UNAVAILABLE', 'message': 'Real training adapter not available' } }) data = request.get_json() training_id = data['training_id'] progress = self.training_adapter.get_training_progress(training_id) return jsonify({ 'success': True, 'progress': progress }) except Exception as e: logger.error(f"Error getting training progress: {e}") return jsonify({ 'success': False, 'error': { 'code': 'PROGRESS_ERROR', 'message': str(e) } }) @self.server.route('/api/available-models', methods=['GET']) def get_available_models(): """Get list of available models""" try: if not self.training_adapter: return jsonify({ 'success': False, 'error': { 'code': 'TRAINING_UNAVAILABLE', 'message': 'Real training adapter not available' } }) models = self.training_adapter.get_available_models() return jsonify({ 'success': True, 'models': models }) except Exception as e: logger.error(f"Error getting available models: {e}") return jsonify({ 'success': False, 'error': { 'code': 'MODEL_LIST_ERROR', 'message': str(e) } }) @self.server.route('/api/realtime-inference/start', methods=['POST']) def start_realtime_inference(): """Start real-time inference mode""" try: data = request.get_json() model_name = data.get('model_name') symbol = data.get('symbol', 'ETH/USDT') if not self.training_adapter: return jsonify({ 'success': False, 'error': { 'code': 'TRAINING_UNAVAILABLE', 'message': 'Real training adapter not available' } }) # Start real-time inference using orchestrator inference_id = self.training_adapter.start_realtime_inference( model_name=model_name, symbol=symbol, data_provider=self.data_provider ) return jsonify({ 'success': True, 'inference_id': inference_id }) except Exception as e: logger.error(f"Error starting real-time inference: {e}") return jsonify({ 'success': False, 'error': { 'code': 'INFERENCE_START_ERROR', 'message': str(e) } }) @self.server.route('/api/realtime-inference/stop', methods=['POST']) def stop_realtime_inference(): """Stop real-time inference mode""" try: data = request.get_json() inference_id = data.get('inference_id') if not self.training_adapter: return jsonify({ 'success': False, 'error': { 'code': 'TRAINING_UNAVAILABLE', 'message': 'Real training adapter not available' } }) self.training_adapter.stop_realtime_inference(inference_id) return jsonify({ 'success': True }) except Exception as e: logger.error(f"Error stopping real-time inference: {e}") return jsonify({ 'success': False, 'error': { 'code': 'INFERENCE_STOP_ERROR', 'message': str(e) } }) @self.server.route('/api/realtime-inference/signals', methods=['GET']) def get_realtime_signals(): """Get latest real-time inference signals""" try: if not self.training_adapter: return jsonify({ 'success': False, 'error': { 'code': 'TRAINING_UNAVAILABLE', 'message': 'Real training adapter not available' } }) signals = self.training_adapter.get_latest_signals() return jsonify({ 'success': True, 'signals': signals }) except Exception as e: logger.error(f"Error getting signals: {e}") return jsonify({ 'success': False, 'error': { 'code': 'SIGNALS_ERROR', 'message': str(e) } }) def run(self, host='127.0.0.1', port=8051, debug=False): """Run the application""" logger.info(f"Starting Annotation Dashboard on http://{host}:{port}") self.server.run(host=host, port=port, debug=debug) def main(): """Main entry point""" dashboard = AnnotationDashboard() dashboard.run(debug=True) if __name__ == '__main__': main()