Files
gogo2/ANNOTATE/web/app.py
Dobromir Popov f456b2747e wip
2025-10-22 17:48:44 +03:00

776 lines
32 KiB
Python

"""
Manual Trade Annotation UI - Main Application
A web-based interface for manually marking profitable buy/sell signals on historical
market data to generate training test cases for machine learning models.
"""
import os
import sys
from pathlib import Path
# Add parent directory to path for imports
parent_dir = Path(__file__).parent.parent.parent
sys.path.insert(0, str(parent_dir))
from flask import Flask, render_template, request, jsonify, send_file
from dash import Dash, html
import logging
from datetime import datetime
import json
# Import core components from main system
try:
from core.data_provider import DataProvider
from core.orchestrator import TradingOrchestrator
from core.config import get_config
except ImportError as e:
print(f"Warning: Could not import main system components: {e}")
print("Running in standalone mode with limited functionality")
DataProvider = None
TradingOrchestrator = None
get_config = lambda: {}
# Import ANNOTATE modules
annotate_dir = Path(__file__).parent.parent
sys.path.insert(0, str(annotate_dir))
try:
from core.annotation_manager import AnnotationManager
from core.training_simulator import TrainingSimulator
from core.data_loader import HistoricalDataLoader, TimeRangeManager
except ImportError:
# Try alternative import path
import importlib.util
# Load annotation_manager
ann_spec = importlib.util.spec_from_file_location(
"annotation_manager",
annotate_dir / "core" / "annotation_manager.py"
)
ann_module = importlib.util.module_from_spec(ann_spec)
ann_spec.loader.exec_module(ann_module)
AnnotationManager = ann_module.AnnotationManager
# Load training_simulator
train_spec = importlib.util.spec_from_file_location(
"training_simulator",
annotate_dir / "core" / "training_simulator.py"
)
train_module = importlib.util.module_from_spec(train_spec)
train_spec.loader.exec_module(train_module)
TrainingSimulator = train_module.TrainingSimulator
# Load data_loader
data_spec = importlib.util.spec_from_file_location(
"data_loader",
annotate_dir / "core" / "data_loader.py"
)
data_module = importlib.util.module_from_spec(data_spec)
data_spec.loader.exec_module(data_module)
HistoricalDataLoader = data_module.HistoricalDataLoader
TimeRangeManager = data_module.TimeRangeManager
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class AnnotationDashboard:
"""Main annotation dashboard application"""
def __init__(self):
"""Initialize the dashboard"""
# Load configuration
self.config = get_config() if get_config else {}
# Initialize Flask app
self.server = Flask(
__name__,
template_folder='templates',
static_folder='static'
)
# Initialize Dash app (optional component)
self.app = Dash(
__name__,
server=self.server,
url_base_pathname='/dash/',
external_stylesheets=[
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css',
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
]
)
# Set a simple Dash layout to avoid NoLayoutException
self.app.layout = html.Div([
html.H1("ANNOTATE Dashboard", className="text-center mb-4"),
html.Div([
html.P("This is the Dash component of the ANNOTATE system."),
html.P("The main interface is available at the Flask routes."),
html.A("Go to Main Interface", href="/", className="btn btn-primary")
], className="container")
])
# Initialize core components
self.data_provider = DataProvider() if DataProvider else None
# Enable unified storage for real-time data access
if self.data_provider:
self._enable_unified_storage_async()
self.orchestrator = TradingOrchestrator(
data_provider=self.data_provider
) if TradingOrchestrator and self.data_provider else None
# Initialize ANNOTATE components
self.annotation_manager = AnnotationManager()
self.training_simulator = TrainingSimulator(self.orchestrator) if self.orchestrator else None
# Initialize data loader with existing DataProvider
self.data_loader = HistoricalDataLoader(self.data_provider) if self.data_provider else None
self.time_range_manager = TimeRangeManager(self.data_loader) if self.data_loader else None
# Setup routes
self._setup_routes()
logger.info("Annotation Dashboard initialized")
def _enable_unified_storage_async(self):
"""Enable unified storage system in background thread"""
def enable_storage():
try:
import asyncio
import threading
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Enable unified storage
success = loop.run_until_complete(
self.data_provider.enable_unified_storage()
)
if success:
logger.info("✅ ANNOTATE: Unified storage enabled for real-time data")
# Get statistics
stats = self.data_provider.get_unified_storage_stats()
if stats.get('initialized'):
logger.info(" Real-time data access: <10ms")
logger.info(" Historical data access: <100ms")
logger.info(" Annotation data: Available at any timestamp")
else:
logger.warning("⚠️ ANNOTATE: Unified storage not available, using cached data only")
except Exception as e:
logger.warning(f"ANNOTATE: Could not enable unified storage: {e}")
logger.info("ANNOTATE: Continuing with cached data access")
# Start in background thread
import threading
storage_thread = threading.Thread(target=enable_storage, daemon=True)
storage_thread.start()
def _setup_routes(self):
"""Setup Flask routes"""
@self.server.route('/')
def index():
"""Main dashboard page - loads existing annotations"""
try:
# Get all existing annotations
annotations = self.annotation_manager.get_annotations()
# Convert to serializable format
annotations_data = []
for ann in annotations:
if hasattr(ann, '__dict__'):
ann_dict = ann.__dict__
else:
ann_dict = ann
# Ensure all fields are JSON serializable
annotations_data.append({
'annotation_id': ann_dict.get('annotation_id'),
'symbol': ann_dict.get('symbol'),
'timeframe': ann_dict.get('timeframe'),
'entry': ann_dict.get('entry'),
'exit': ann_dict.get('exit'),
'direction': ann_dict.get('direction'),
'profit_loss_pct': ann_dict.get('profit_loss_pct'),
'notes': ann_dict.get('notes', ''),
'created_at': ann_dict.get('created_at')
})
logger.info(f"Loading dashboard with {len(annotations_data)} existing annotations")
# Prepare template data
template_data = {
'current_symbol': 'ETH/USDT',
'timeframes': ['1s', '1m', '1h', '1d'],
'annotations': annotations_data
}
return render_template('annotation_dashboard.html', **template_data)
except Exception as e:
logger.error(f"Error rendering main page: {e}")
# Fallback simple HTML page
return f"""
<html>
<head>
<title>ANNOTATE - Manual Trade Annotation UI</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
</head>
<body>
<div class="container mt-5">
<h1 class="text-center">📝 ANNOTATE - Manual Trade Annotation UI</h1>
<div class="alert alert-info">
<h4>System Status</h4>
<p>✅ Annotation Manager: Active</p>
<p>⚠️ Data Provider: {'Available' if self.data_provider else 'Not Available (Standalone Mode)'}</p>
<p>⚠️ Trading Orchestrator: {'Available' if self.orchestrator else 'Not Available (Standalone Mode)'}</p>
</div>
<div class="row">
<div class="col-md-6">
<h3>Available Features</h3>
<ul>
<li>Manual trade annotation</li>
<li>Test case generation</li>
<li>Annotation export</li>
<li>Training simulation</li>
</ul>
</div>
<div class="col-md-6">
<h3>API Endpoints</h3>
<ul>
<li><code>POST /api/chart-data</code> - Get chart data</li>
<li><code>POST /api/save-annotation</code> - Save annotation</li>
<li><code>POST /api/delete-annotation</code> - Delete annotation</li>
<li><code>POST /api/generate-test-case</code> - Generate test case</li>
<li><code>POST /api/export-annotations</code> - Export annotations</li>
</ul>
</div>
</div>
<div class="text-center mt-4">
<a href="/dash/" class="btn btn-primary">Go to Dash Interface</a>
</div>
</div>
</body>
</html>
"""
@self.server.route('/api/chart-data', methods=['POST'])
def get_chart_data():
"""Get chart data for specified symbol and timeframes"""
try:
data = request.get_json()
symbol = data.get('symbol', 'ETH/USDT')
timeframes = data.get('timeframes', ['1s', '1m', '1h', '1d'])
start_time_str = data.get('start_time')
end_time_str = data.get('end_time')
if not self.data_loader:
return jsonify({
'success': False,
'error': {
'code': 'DATA_LOADER_UNAVAILABLE',
'message': 'Data loader not available'
}
})
# Parse time strings if provided
start_time = datetime.fromisoformat(start_time_str.replace('Z', '+00:00')) if start_time_str else None
end_time = datetime.fromisoformat(end_time_str.replace('Z', '+00:00')) if end_time_str else None
# Fetch data for each timeframe using data loader
chart_data = {}
for timeframe in timeframes:
df = self.data_loader.get_data(
symbol=symbol,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
limit=500
)
if df is not None and not df.empty:
# Convert to format suitable for Plotly
chart_data[timeframe] = {
'timestamps': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(),
'open': df['open'].tolist(),
'high': df['high'].tolist(),
'low': df['low'].tolist(),
'close': df['close'].tolist(),
'volume': df['volume'].tolist()
}
return jsonify({
'success': True,
'chart_data': chart_data
})
except Exception as e:
logger.error(f"Error fetching chart data: {e}")
return jsonify({
'success': False,
'error': {
'code': 'CHART_DATA_ERROR',
'message': str(e)
}
})
@self.server.route('/api/save-annotation', methods=['POST'])
def save_annotation():
"""Save a new annotation with full market context"""
try:
data = request.get_json()
# Capture market state at entry and exit times
entry_market_state = {}
exit_market_state = {}
if self.data_loader:
try:
# Parse timestamps
entry_time = datetime.fromisoformat(data['entry']['timestamp'].replace('Z', '+00:00'))
exit_time = datetime.fromisoformat(data['exit']['timestamp'].replace('Z', '+00:00'))
# Fetch market data for all timeframes at entry time
timeframes = ['1s', '1m', '1h', '1d']
for tf in timeframes:
df = self.data_loader.get_data(
symbol=data['symbol'],
timeframe=tf,
end_time=entry_time,
limit=100
)
if df is not None and not df.empty:
entry_market_state[f'ohlcv_{tf}'] = {
'timestamps': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(),
'open': df['open'].tolist(),
'high': df['high'].tolist(),
'low': df['low'].tolist(),
'close': df['close'].tolist(),
'volume': df['volume'].tolist()
}
# Fetch market data at exit time
for tf in timeframes:
df = self.data_loader.get_data(
symbol=data['symbol'],
timeframe=tf,
end_time=exit_time,
limit=100
)
if df is not None and not df.empty:
exit_market_state[f'ohlcv_{tf}'] = {
'timestamps': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(),
'open': df['open'].tolist(),
'high': df['high'].tolist(),
'low': df['low'].tolist(),
'close': df['close'].tolist(),
'volume': df['volume'].tolist()
}
logger.info(f"Captured market state: {len(entry_market_state)} timeframes at entry, {len(exit_market_state)} at exit")
except Exception as e:
logger.error(f"Error capturing market state: {e}")
# Create annotation with market context
annotation = self.annotation_manager.create_annotation(
entry_point=data['entry'],
exit_point=data['exit'],
symbol=data['symbol'],
timeframe=data['timeframe'],
entry_market_state=entry_market_state,
exit_market_state=exit_market_state
)
# Save annotation
self.annotation_manager.save_annotation(annotation)
# Automatically generate test case with ±5min data
try:
test_case = self.annotation_manager.generate_test_case(
annotation,
data_provider=self.data_provider,
auto_save=True
)
# Log test case details
market_state = test_case.get('market_state', {})
timeframes_with_data = [k for k in market_state.keys() if k.startswith('ohlcv_')]
logger.info(f"Auto-generated test case: {test_case['test_case_id']}")
logger.info(f" Timeframes: {timeframes_with_data}")
for tf_key in timeframes_with_data:
candle_count = len(market_state[tf_key].get('timestamps', []))
logger.info(f" {tf_key}: {candle_count} candles")
if 'training_labels' in market_state:
logger.info(f" Training labels: {len(market_state['training_labels'].get('labels_1m', []))} labels")
except Exception as e:
logger.error(f"Failed to auto-generate test case: {e}")
import traceback
traceback.print_exc()
return jsonify({
'success': True,
'annotation': annotation.__dict__ if hasattr(annotation, '__dict__') else annotation
})
except Exception as e:
logger.error(f"Error saving annotation: {e}")
return jsonify({
'success': False,
'error': {
'code': 'SAVE_ANNOTATION_ERROR',
'message': str(e)
}
})
@self.server.route('/api/delete-annotation', methods=['POST'])
def delete_annotation():
"""Delete an annotation"""
try:
data = request.get_json()
annotation_id = data['annotation_id']
self.annotation_manager.delete_annotation(annotation_id)
return jsonify({'success': True})
except Exception as e:
logger.error(f"Error deleting annotation: {e}")
return jsonify({
'success': False,
'error': {
'code': 'DELETE_ANNOTATION_ERROR',
'message': str(e)
}
})
@self.server.route('/api/generate-test-case', methods=['POST'])
def generate_test_case():
"""Generate test case from annotation"""
try:
data = request.get_json()
annotation_id = data['annotation_id']
# Get annotation
annotations = self.annotation_manager.get_annotations()
annotation = next((a for a in annotations
if (a.annotation_id if hasattr(a, 'annotation_id')
else a.get('annotation_id')) == annotation_id), None)
if not annotation:
return jsonify({
'success': False,
'error': {
'code': 'ANNOTATION_NOT_FOUND',
'message': 'Annotation not found'
}
})
# Generate test case with market context
test_case = self.annotation_manager.generate_test_case(
annotation,
data_provider=self.data_provider
)
return jsonify({
'success': True,
'test_case': test_case
})
except Exception as e:
logger.error(f"Error generating test case: {e}")
return jsonify({
'success': False,
'error': {
'code': 'GENERATE_TESTCASE_ERROR',
'message': str(e)
}
})
@self.server.route('/api/export-annotations', methods=['POST'])
def export_annotations():
"""Export annotations to file"""
try:
data = request.get_json()
symbol = data.get('symbol')
format_type = data.get('format', 'json')
# Get annotations
annotations = self.annotation_manager.get_annotations(symbol=symbol)
# Export to file
output_path = self.annotation_manager.export_annotations(
annotations=annotations,
format_type=format_type
)
return send_file(output_path, as_attachment=True)
except Exception as e:
logger.error(f"Error exporting annotations: {e}")
return jsonify({
'success': False,
'error': {
'code': 'EXPORT_ERROR',
'message': str(e)
}
})
@self.server.route('/api/train-model', methods=['POST'])
def train_model():
"""Start model training with annotations"""
try:
if not self.training_simulator:
return jsonify({
'success': False,
'error': {
'code': 'TRAINING_UNAVAILABLE',
'message': 'Training simulator not available'
}
})
data = request.get_json()
model_name = data['model_name']
annotation_ids = data.get('annotation_ids', [])
# If no specific annotations provided, use all
if not annotation_ids:
annotations = self.annotation_manager.get_annotations()
annotation_ids = [
a.annotation_id if hasattr(a, 'annotation_id') else a.get('annotation_id')
for a in annotations
]
# Load test cases from disk (they were auto-generated when annotations were saved)
all_test_cases = self.annotation_manager.get_all_test_cases()
# Filter to selected annotations
test_cases = [
tc for tc in all_test_cases
if tc['test_case_id'].replace('annotation_', '') in annotation_ids
]
if not test_cases:
return jsonify({
'success': False,
'error': {
'code': 'NO_TEST_CASES',
'message': f'No test cases found for {len(annotation_ids)} annotations'
}
})
logger.info(f"Starting training with {len(test_cases)} test cases for model {model_name}")
# Start training
training_id = self.training_simulator.start_training(
model_name=model_name,
test_cases=test_cases
)
return jsonify({
'success': True,
'training_id': training_id,
'test_cases_count': len(test_cases)
})
except Exception as e:
logger.error(f"Error starting training: {e}")
return jsonify({
'success': False,
'error': {
'code': 'TRAINING_ERROR',
'message': str(e)
}
})
@self.server.route('/api/training-progress', methods=['POST'])
def get_training_progress():
"""Get training progress"""
try:
if not self.training_simulator:
return jsonify({
'success': False,
'error': {
'code': 'TRAINING_UNAVAILABLE',
'message': 'Training simulator not available'
}
})
data = request.get_json()
training_id = data['training_id']
progress = self.training_simulator.get_training_progress(training_id)
return jsonify({
'success': True,
'progress': progress
})
except Exception as e:
logger.error(f"Error getting training progress: {e}")
return jsonify({
'success': False,
'error': {
'code': 'PROGRESS_ERROR',
'message': str(e)
}
})
@self.server.route('/api/available-models', methods=['GET'])
def get_available_models():
"""Get list of available models"""
try:
if not self.training_simulator:
return jsonify({
'success': False,
'error': {
'code': 'TRAINING_UNAVAILABLE',
'message': 'Training simulator not available'
}
})
models = self.training_simulator.get_available_models()
return jsonify({
'success': True,
'models': models
})
except Exception as e:
logger.error(f"Error getting available models: {e}")
return jsonify({
'success': False,
'error': {
'code': 'MODEL_LIST_ERROR',
'message': str(e)
}
})
@self.server.route('/api/realtime-inference/start', methods=['POST'])
def start_realtime_inference():
"""Start real-time inference mode"""
try:
data = request.get_json()
model_name = data.get('model_name')
symbol = data.get('symbol', 'ETH/USDT')
if not self.training_simulator:
return jsonify({
'success': False,
'error': {
'code': 'TRAINING_UNAVAILABLE',
'message': 'Training simulator not available'
}
})
# Start real-time inference
inference_id = self.training_simulator.start_realtime_inference(
model_name=model_name,
symbol=symbol,
data_provider=self.data_provider
)
return jsonify({
'success': True,
'inference_id': inference_id
})
except Exception as e:
logger.error(f"Error starting real-time inference: {e}")
return jsonify({
'success': False,
'error': {
'code': 'INFERENCE_START_ERROR',
'message': str(e)
}
})
@self.server.route('/api/realtime-inference/stop', methods=['POST'])
def stop_realtime_inference():
"""Stop real-time inference mode"""
try:
data = request.get_json()
inference_id = data.get('inference_id')
if not self.training_simulator:
return jsonify({
'success': False,
'error': {
'code': 'TRAINING_UNAVAILABLE',
'message': 'Training simulator not available'
}
})
self.training_simulator.stop_realtime_inference(inference_id)
return jsonify({
'success': True
})
except Exception as e:
logger.error(f"Error stopping real-time inference: {e}")
return jsonify({
'success': False,
'error': {
'code': 'INFERENCE_STOP_ERROR',
'message': str(e)
}
})
@self.server.route('/api/realtime-inference/signals', methods=['GET'])
def get_realtime_signals():
"""Get latest real-time inference signals"""
try:
if not self.training_simulator:
return jsonify({
'success': False,
'error': {
'code': 'TRAINING_UNAVAILABLE',
'message': 'Training simulator not available'
}
})
signals = self.training_simulator.get_latest_signals()
return jsonify({
'success': True,
'signals': signals
})
except Exception as e:
logger.error(f"Error getting signals: {e}")
return jsonify({
'success': False,
'error': {
'code': 'SIGNALS_ERROR',
'message': str(e)
}
})
def run(self, host='127.0.0.1', port=8051, debug=False):
"""Run the application"""
logger.info(f"Starting Annotation Dashboard on http://{host}:{port}")
self.server.run(host=host, port=port, debug=debug)
def main():
"""Main entry point"""
dashboard = AnnotationDashboard()
dashboard.run(debug=True)
if __name__ == '__main__':
main()