"""
Manual Trade Annotation UI - Main Application
A web-based interface for manually marking profitable buy/sell signals on historical
market data to generate training test cases for machine learning models.
"""
import os
import sys
from pathlib import Path
# Add parent directory to path for imports
parent_dir = Path(__file__).parent.parent.parent
sys.path.insert(0, str(parent_dir))
from flask import Flask, render_template, request, jsonify, send_file
from dash import Dash, html
import logging
from datetime import datetime
from typing import Optional, Dict, List, Any
import json
import pandas as pd
import numpy as np
# Import core components from main system
try:
from core.data_provider import DataProvider
from core.orchestrator import TradingOrchestrator
from core.config import get_config
from core.williams_market_structure import WilliamsMarketStructure
except ImportError as e:
print(f"Warning: Could not import main system components: {e}")
print("Running in standalone mode with limited functionality")
DataProvider = None
WilliamsMarketStructure = None
TradingOrchestrator = None
get_config = lambda: {}
# Import ANNOTATE modules
annotate_dir = Path(__file__).parent.parent
sys.path.insert(0, str(annotate_dir))
try:
from core.annotation_manager import AnnotationManager
from core.real_training_adapter import RealTrainingAdapter
from core.data_loader import HistoricalDataLoader, TimeRangeManager
except ImportError:
# Try alternative import path
import importlib.util
# Load annotation_manager
ann_spec = importlib.util.spec_from_file_location(
"annotation_manager",
annotate_dir / "core" / "annotation_manager.py"
)
ann_module = importlib.util.module_from_spec(ann_spec)
ann_spec.loader.exec_module(ann_module)
AnnotationManager = ann_module.AnnotationManager
# Load real_training_adapter (NO SIMULATION!)
train_spec = importlib.util.spec_from_file_location(
"real_training_adapter",
annotate_dir / "core" / "real_training_adapter.py"
)
train_module = importlib.util.module_from_spec(train_spec)
train_spec.loader.exec_module(train_module)
RealTrainingAdapter = train_module.RealTrainingAdapter
# Load data_loader
data_spec = importlib.util.spec_from_file_location(
"data_loader",
annotate_dir / "core" / "data_loader.py"
)
data_module = importlib.util.module_from_spec(data_spec)
data_spec.loader.exec_module(data_module)
HistoricalDataLoader = data_module.HistoricalDataLoader
TimeRangeManager = data_module.TimeRangeManager
# Setup logging - configure before any logging occurs
log_dir = Path(__file__).parent.parent / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / 'annotate_app.log'
# Configure logging to both file and console
# File mode 'w' truncates the file on each run
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='w'), # Truncate on each run
logging.StreamHandler(sys.stdout) # Also print to console
]
)
logger = logging.getLogger(__name__)
logger.info(f"Logging to: {log_file}")
class AnnotationDashboard:
"""Main annotation dashboard application"""
def __init__(self):
"""Initialize the dashboard"""
# Load configuration
try:
# Always try YAML loading first since get_config might not work in standalone mode
import yaml
with open('config.yaml', 'r') as f:
self.config = yaml.safe_load(f)
logger.info(f"Loaded config via YAML: {len(self.config)} keys")
except Exception as e:
logger.warning(f"Could not load config via YAML: {e}")
try:
# Fallback to get_config if available
if get_config:
self.config = get_config()
logger.info(f"Loaded config via get_config: {len(self.config)} keys")
else:
raise Exception("get_config not available")
except Exception as e2:
logger.warning(f"Could not load config via get_config: {e2}")
# Final fallback config with SOL/USDT
self.config = {
'symbols': ['ETH/USDT', 'BTC/USDT', 'SOL/USDT'],
'timeframes': ['1s', '1m', '1h', '1d']
}
logger.info("Using fallback config")
# Initialize Flask app
self.server = Flask(
__name__,
template_folder='templates',
static_folder='static'
)
# Suppress werkzeug request logs (reduce noise from polling endpoints)
werkzeug_logger = logging.getLogger('werkzeug')
werkzeug_logger.setLevel(logging.WARNING) # Only show warnings and errors, not INFO
# Initialize Dash app (optional component)
self.app = Dash(
__name__,
server=self.server,
url_base_pathname='/dash/',
external_stylesheets=[
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css',
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
]
)
# Set a simple Dash layout to avoid NoLayoutException
self.app.layout = html.Div([
html.H1("ANNOTATE Dashboard", className="text-center mb-4"),
html.Div([
html.P("This is the Dash component of the ANNOTATE system."),
html.P("The main interface is available at the Flask routes."),
html.A("Go to Main Interface", href="/", className="btn btn-primary")
], className="container")
])
# Initialize core components (skip initial load for fast startup)
self.data_provider = DataProvider(skip_initial_load=True) if DataProvider else None
# Enable unified storage for real-time data access
if self.data_provider:
self._enable_unified_storage_async()
# ANNOTATE doesn't need orchestrator immediately - lazy load on demand
self.orchestrator = None
self.models_loading = False
self.available_models = ['DQN', 'CNN', 'Transformer'] # Models that CAN be loaded
self.loaded_models = {} # Models that ARE loaded: {name: model_instance}
# Initialize ANNOTATE components
self.annotation_manager = AnnotationManager()
# Use REAL training adapter - NO SIMULATION!
self.training_adapter = RealTrainingAdapter(None, self.data_provider)
# Don't auto-load models - wait for user to click LOAD button
logger.info("Models available for lazy loading: " + ", ".join(self.available_models))
# Initialize data loader with existing DataProvider
self.data_loader = HistoricalDataLoader(self.data_provider) if self.data_provider else None
self.time_range_manager = TimeRangeManager(self.data_loader) if self.data_loader else None
# Setup routes
self._setup_routes()
# Start background data refresh after startup
if self.data_loader:
self._start_background_data_refresh()
logger.info("Annotation Dashboard initialized")
def _get_best_checkpoint_info(self, model_name: str) -> Optional[Dict]:
"""
Get best checkpoint info for a model without loading it
First tries database, then falls back to filename parsing
Args:
model_name: Name of the model
Returns:
Dict with checkpoint info or None if no checkpoint found
"""
try:
# Try to get from database first (has full metadata)
try:
from utils.database_manager import DatabaseManager
db_manager = DatabaseManager()
# Get active checkpoint for this model
with db_manager._get_connection() as conn:
cursor = conn.execute("""
SELECT checkpoint_id, performance_metrics, timestamp, file_path
FROM checkpoint_metadata
WHERE model_name = ? AND is_active = TRUE
ORDER BY timestamp DESC
LIMIT 1
""", (model_name.lower(),))
row = cursor.fetchone()
if row:
import json
checkpoint_id, metrics_json, timestamp, file_path = row
metrics = json.loads(metrics_json) if metrics_json else {}
checkpoint_info = {
'filename': os.path.basename(file_path) if file_path else checkpoint_id,
'epoch': metrics.get('epoch', 0),
'loss': metrics.get('loss'),
'accuracy': metrics.get('accuracy'),
'source': 'database'
}
logger.info(f"Loaded checkpoint info from database for {model_name}: E{checkpoint_info['epoch']}, Loss={checkpoint_info['loss']}, Acc={checkpoint_info['accuracy']}")
return checkpoint_info
except Exception as db_error:
logger.debug(f"Could not load from database: {db_error}")
# Fallback to filename parsing
import glob
import re
# Map model names to checkpoint directories
checkpoint_dirs = {
'Transformer': 'models/checkpoints/transformer',
'CNN': 'models/checkpoints/enhanced_cnn',
'DQN': 'models/checkpoints/dqn_agent'
}
checkpoint_dir = checkpoint_dirs.get(model_name)
if not checkpoint_dir:
return None
if not os.path.exists(checkpoint_dir):
logger.debug(f"Checkpoint directory not found: {checkpoint_dir}")
return None
# Find all checkpoint files
checkpoint_files = glob.glob(os.path.join(checkpoint_dir, '*.pt'))
if not checkpoint_files:
logger.debug(f"No checkpoint files found in {checkpoint_dir}")
return None
logger.debug(f"Found {len(checkpoint_files)} checkpoints for {model_name}")
# Parse filenames to extract epoch info
# Format: transformer_epoch5_20251110_123620.pt
best_checkpoint = None
best_epoch = -1
for cp_file in checkpoint_files:
try:
filename = os.path.basename(cp_file)
# Extract epoch number from filename
match = re.search(r'epoch(\d+)', filename, re.IGNORECASE)
if match:
epoch = int(match.group(1))
if epoch > best_epoch:
best_epoch = epoch
best_checkpoint = {
'filename': filename,
'epoch': epoch,
'loss': None, # Can't get without loading
'accuracy': None, # Can't get without loading
'source': 'filename'
}
logger.debug(f"Found checkpoint: {filename}, epoch {epoch}")
except Exception as e:
logger.debug(f"Could not parse checkpoint {cp_file}: {e}")
continue
if best_checkpoint:
logger.info(f"Best checkpoint for {model_name}: {best_checkpoint['filename']} (E{best_checkpoint['epoch']})")
return best_checkpoint
except Exception as e:
logger.error(f"Error getting checkpoint info for {model_name}: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _load_model_lazy(self, model_name: str) -> dict:
"""
Lazy load a specific model on demand
Args:
model_name: Name of model to load ('DQN', 'CNN', 'Transformer')
Returns:
dict: Result with success status and message
"""
try:
# Check if already loaded
if model_name in self.loaded_models:
return {
'success': True,
'message': f'{model_name} already loaded',
'already_loaded': True
}
# Check if model is available
if model_name not in self.available_models:
return {
'success': False,
'error': f'{model_name} is not in available models list'
}
logger.info(f"Loading {model_name} model...")
# Initialize orchestrator if not already done
if not self.orchestrator:
if not TradingOrchestrator:
return {
'success': False,
'error': 'TradingOrchestrator class not available'
}
logger.info("Creating TradingOrchestrator instance...")
self.orchestrator = TradingOrchestrator(
data_provider=self.data_provider,
enhanced_rl_training=True
)
logger.info("Orchestrator created")
# Update training adapter
self.training_adapter.orchestrator = self.orchestrator
# Load specific model
if model_name == 'DQN':
if not hasattr(self.orchestrator, 'rl_agent') or not self.orchestrator.rl_agent:
# Initialize RL agent
self.orchestrator._initialize_rl_agent()
self.loaded_models['DQN'] = self.orchestrator.rl_agent
elif model_name == 'CNN':
if not hasattr(self.orchestrator, 'cnn_model') or not self.orchestrator.cnn_model:
# Initialize CNN model
self.orchestrator._initialize_cnn_model()
self.loaded_models['CNN'] = self.orchestrator.cnn_model
elif model_name == 'Transformer':
if not hasattr(self.orchestrator, 'primary_transformer') or not self.orchestrator.primary_transformer:
# Initialize Transformer model
self.orchestrator._initialize_transformer_model()
self.loaded_models['Transformer'] = self.orchestrator.primary_transformer
else:
return {
'success': False,
'error': f'Unknown model: {model_name}'
}
logger.info(f"{model_name} model loaded successfully")
return {
'success': True,
'message': f'{model_name} loaded successfully',
'loaded_models': list(self.loaded_models.keys())
}
except Exception as e:
logger.error(f"Error loading {model_name}: {e}")
import traceback
logger.error(f"Traceback:\n{traceback.format_exc()}")
return {
'success': False,
'error': str(e)
}
def _enable_unified_storage_async(self):
"""Enable unified storage system in background thread"""
def enable_storage():
try:
import asyncio
import threading
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Enable unified storage
success = loop.run_until_complete(
self.data_provider.enable_unified_storage()
)
if success:
logger.info(" ANNOTATE: Unified storage enabled for real-time data")
# Get statistics
stats = self.data_provider.get_unified_storage_stats()
if stats.get('initialized'):
logger.info(" Real-time data access: <10ms")
logger.info(" Historical data access: <100ms")
logger.info(" Annotation data: Available at any timestamp")
else:
logger.warning(" ANNOTATE: Unified storage not available, using cached data only")
except Exception as e:
logger.warning(f"ANNOTATE: Could not enable unified storage: {e}")
logger.info("ANNOTATE: Continuing with cached data access")
# Start in background thread
import threading
storage_thread = threading.Thread(target=enable_storage, daemon=True)
storage_thread.start()
def _start_background_data_refresh(self):
"""Start background task to refresh recent data after startup - ONCE ONLY"""
def refresh_recent_data():
try:
import time
# Wait for app to fully start
time.sleep(5)
logger.info(" Starting one-time background data refresh (fetching only recent missing data)")
# Disable startup mode to fetch fresh data
self.data_loader.disable_startup_mode()
# Use the new on-demand refresh method
logger.info("Using on-demand refresh for recent data")
self.data_provider.refresh_data_on_demand()
logger.info(" One-time background data refresh completed")
except Exception as e:
logger.error(f"Error in background data refresh: {e}")
# Start refresh in background thread
import threading
refresh_thread = threading.Thread(target=refresh_recent_data, daemon=True)
refresh_thread.start()
logger.info("One-time background data refresh scheduled")
def _get_pivot_markers_for_timeframe(self, symbol: str, timeframe: str, df: pd.DataFrame) -> dict:
"""
Get pivot markers for a specific timeframe using WilliamsMarketStructure directly
Returns dict with all pivot points and identifies which are the last high/low per level
"""
try:
if WilliamsMarketStructure is None:
logger.warning("WilliamsMarketStructure not available")
return {}
if df is None or len(df) < 10:
logger.warning(f"Insufficient data for pivot calculation: {len(df) if df is not None else 0} bars")
return {}
# Convert DataFrame to numpy array format expected by Williams Market Structure
ohlcv_array = df[['open', 'high', 'low', 'close', 'volume']].copy()
# Add timestamp as first column (convert to milliseconds)
timestamps = df.index.astype(np.int64) // 10**6 # pandas index is ns -> convert to ms
ohlcv_array.insert(0, 'timestamp', timestamps)
ohlcv_array = ohlcv_array.to_numpy()
# Initialize Williams Market Structure with default distance
# We'll override it in the calculation call
williams = WilliamsMarketStructure(min_pivot_distance=1)
# Calculate recursive pivot points with min_pivot_distance=2
# This ensures 5 candles per pivot (tip + 2 prev + 2 next)
pivot_levels = williams.calculate_recursive_pivot_points(
ohlcv_array,
min_pivot_distance=2
)
if not pivot_levels:
logger.debug(f"No pivot levels found for {symbol} {timeframe}")
return {}
# Build a map of timestamp -> pivot info
# Also track last high/low per level for drawing horizontal lines
pivot_map = {}
last_pivots = {} # {level: {'high': (ts_str, idx), 'low': (ts_str, idx)}}
# For each level (1-5), collect ALL pivot points
for level_num, trend_level in pivot_levels.items():
if not hasattr(trend_level, 'pivot_points') or not trend_level.pivot_points:
continue
last_pivots[level_num] = {'high': None, 'low': None}
# Add ALL pivot points to the map
for pivot in trend_level.pivot_points:
ts_str = pivot.timestamp.strftime('%Y-%m-%d %H:%M:%S')
if ts_str not in pivot_map:
pivot_map[ts_str] = {'highs': [], 'lows': []}
pivot_info = {
'level': level_num,
'price': pivot.price,
'strength': pivot.strength,
'is_last': False # Will be updated below
}
if pivot.pivot_type == 'high':
pivot_map[ts_str]['highs'].append(pivot_info)
last_pivots[level_num]['high'] = (ts_str, len(pivot_map[ts_str]['highs']) - 1)
elif pivot.pivot_type == 'low':
pivot_map[ts_str]['lows'].append(pivot_info)
last_pivots[level_num]['low'] = (ts_str, len(pivot_map[ts_str]['lows']) - 1)
# Mark the last high and last low for each level
for level_num, last_info in last_pivots.items():
if last_info['high']:
ts_str, idx = last_info['high']
pivot_map[ts_str]['highs'][idx]['is_last'] = True
if last_info['low']:
ts_str, idx = last_info['low']
pivot_map[ts_str]['lows'][idx]['is_last'] = True
logger.info(f"Found {len(pivot_map)} pivot candles for {symbol} {timeframe} (from {len(df)} candles)")
return pivot_map
except Exception as e:
logger.error(f"Error getting pivot markers for {timeframe}: {e}")
import traceback
logger.error(traceback.format_exc())
return {}
def _setup_routes(self):
"""Setup Flask routes"""
@self.server.route('/favicon.ico')
def favicon():
"""Serve favicon to prevent 404 errors"""
from flask import Response
# Return a simple 1x1 transparent pixel as favicon
favicon_data = b'\x00\x00\x01\x00\x01\x00\x10\x10\x00\x00\x01\x00\x20\x00\x68\x04\x00\x00\x16\x00\x00\x00\x28\x00\x00\x00\x10\x00\x00\x00\x20\x00\x00\x00\x01\x00\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
return Response(favicon_data, mimetype='image/x-icon')
@self.server.route('/')
def index():
"""Main dashboard page - loads existing annotations"""
try:
# Get all existing annotations
annotations = self.annotation_manager.get_annotations()
# Convert to serializable format
annotations_data = []
for ann in annotations:
if hasattr(ann, '__dict__'):
ann_dict = ann.__dict__
else:
ann_dict = ann
# Ensure all fields are JSON serializable
annotations_data.append({
'annotation_id': ann_dict.get('annotation_id'),
'symbol': ann_dict.get('symbol'),
'timeframe': ann_dict.get('timeframe'),
'entry': ann_dict.get('entry'),
'exit': ann_dict.get('exit'),
'direction': ann_dict.get('direction'),
'profit_loss_pct': ann_dict.get('profit_loss_pct'),
'notes': ann_dict.get('notes', ''),
'created_at': ann_dict.get('created_at')
})
logger.info(f"Loading dashboard with {len(annotations_data)} existing annotations")
# Get symbols and timeframes from config
symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT'])
timeframes = self.config.get('timeframes', ['1s', '1m', '1h', '1d'])
# Prepare template data
template_data = {
'current_symbol': symbols[0] if symbols else 'ETH/USDT', # Use first symbol as default
'symbols': symbols,
'timeframes': timeframes,
'annotations': annotations_data
}
return render_template('annotation_dashboard.html', **template_data)
except Exception as e:
logger.error(f"Error rendering main page: {e}")
# Fallback simple HTML page
return f"""
ANNOTATE - Manual Trade Annotation UI
📝 ANNOTATE - Manual Trade Annotation UI
System Status
Annotation Manager: Active
Data Provider: {'Available' if self.data_provider else 'Not Available (Standalone Mode)'}
Trading Orchestrator: {'Available' if self.orchestrator else 'Not Available (Standalone Mode)'}