"""
Manual Trade Annotation UI - Main Application
A web-based interface for manually marking profitable buy/sell signals on historical
market data to generate training test cases for machine learning models.
"""
import os
import sys
from pathlib import Path
# Add parent directory to path for imports
parent_dir = Path(__file__).parent.parent.parent
sys.path.insert(0, str(parent_dir))
from flask import Flask, render_template, request, jsonify, send_file
from dash import Dash, html
import logging
from datetime import datetime
from typing import Optional, Dict, List, Any
import json
import pandas as pd
import numpy as np
import threading
import uuid
import time
import torch
# Import core components from main system
try:
from core.data_provider import DataProvider
from core.orchestrator import TradingOrchestrator
from core.config import get_config
from core.williams_market_structure import WilliamsMarketStructure
except ImportError as e:
print(f"Warning: Could not import main system components: {e}")
print("Running in standalone mode with limited functionality")
DataProvider = None
WilliamsMarketStructure = None
TradingOrchestrator = None
get_config = lambda: {}
# Import ANNOTATE modules
annotate_dir = Path(__file__).parent.parent
sys.path.insert(0, str(annotate_dir))
try:
from core.annotation_manager import AnnotationManager
from core.real_training_adapter import RealTrainingAdapter
from core.data_loader import HistoricalDataLoader, TimeRangeManager
except ImportError:
# Try alternative import path
import importlib.util
# Load annotation_manager
ann_spec = importlib.util.spec_from_file_location(
"annotation_manager",
annotate_dir / "core" / "annotation_manager.py"
)
ann_module = importlib.util.module_from_spec(ann_spec)
ann_spec.loader.exec_module(ann_module)
AnnotationManager = ann_module.AnnotationManager
# Load real_training_adapter (NO SIMULATION!)
train_spec = importlib.util.spec_from_file_location(
"real_training_adapter",
annotate_dir / "core" / "real_training_adapter.py"
)
train_module = importlib.util.module_from_spec(train_spec)
train_spec.loader.exec_module(train_module)
RealTrainingAdapter = train_module.RealTrainingAdapter
# Load data_loader
data_spec = importlib.util.spec_from_file_location(
"data_loader",
annotate_dir / "core" / "data_loader.py"
)
data_module = importlib.util.module_from_spec(data_spec)
data_spec.loader.exec_module(data_module)
HistoricalDataLoader = data_module.HistoricalDataLoader
TimeRangeManager = data_module.TimeRangeManager
# Setup logging - configure before any logging occurs
log_dir = Path(__file__).parent.parent / 'logs'
log_dir.mkdir(exist_ok=True)
log_file = log_dir / 'annotate_app.log'
# Configure logging to both file and console
# File mode 'w' truncates the file on each run
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, mode='w'), # Truncate on each run
logging.StreamHandler(sys.stdout) # Also print to console
]
)
logger = logging.getLogger(__name__)
logger.info(f"Logging to: {log_file}")
class BacktestRunner:
"""Runs backtest candle-by-candle with model predictions and tracks PnL"""
def __init__(self):
self.active_backtests = {} # backtest_id -> state
self.lock = threading.Lock()
def start_backtest(self, backtest_id: str, model, data_provider, symbol: str, timeframe: str,
orchestrator=None, start_time: Optional[str] = None, end_time: Optional[str] = None):
"""Start backtest in background thread"""
# Initialize backtest state
state = {
'status': 'running',
'candles_processed': 0,
'total_candles': 0,
'pnl': 0.0,
'total_trades': 0,
'wins': 0,
'losses': 0,
'new_predictions': [],
'position': None, # {'type': 'long/short', 'entry_price': float, 'entry_time': str}
'error': None,
'stop_requested': False,
'orchestrator': orchestrator,
'symbol': symbol
}
# Clear previous predictions from orchestrator
if orchestrator and hasattr(orchestrator, 'recent_transformer_predictions'):
if symbol in orchestrator.recent_transformer_predictions:
orchestrator.recent_transformer_predictions[symbol].clear()
if symbol in orchestrator.recent_cnn_predictions:
orchestrator.recent_cnn_predictions[symbol].clear()
if symbol in orchestrator.recent_dqn_predictions:
orchestrator.recent_dqn_predictions[symbol].clear()
logger.info(f"Cleared previous predictions for backtest on {symbol}")
with self.lock:
self.active_backtests[backtest_id] = state
# Run backtest in background thread
thread = threading.Thread(
target=self._run_backtest,
args=(backtest_id, model, data_provider, symbol, timeframe, orchestrator, start_time, end_time)
)
thread.daemon = True
thread.start()
def _run_backtest(self, backtest_id: str, model, data_provider, symbol: str, timeframe: str,
orchestrator=None, start_time: Optional[str] = None, end_time: Optional[str] = None):
"""Execute backtest candle-by-candle"""
try:
state = self.active_backtests[backtest_id]
# Get historical data
logger.info(f"Backtest {backtest_id}: Fetching data for {symbol} {timeframe}")
# Get candles for the time range
if start_time and end_time:
# Parse time range and fetch data
df = data_provider.get_historical_data(
symbol=symbol,
timeframe=timeframe,
limit=1000 # Max candles
)
else:
# Use last 500 candles
df = data_provider.get_historical_data(
symbol=symbol,
timeframe=timeframe,
limit=500
)
if df is None or df.empty:
state['status'] = 'error'
state['error'] = 'No data available'
return
logger.info(f"Backtest {backtest_id}: Processing {len(df)} candles")
state['total_candles'] = len(df)
# Prepare for inference
model.eval()
# IMPORTANT: Use CPU for backtest to avoid ROCm/HIP compatibility issues
# GPU inference has kernel compatibility problems with some model architectures
device = torch.device('cpu')
model.to(device)
logger.info(f"Backtest {backtest_id}: Using CPU for stable inference (avoiding ROCm/HIP issues)")
# Need at least 200 candles for context
min_context = 200
# Process candles one by one
for i in range(min_context, len(df)):
if state['stop_requested']:
state['status'] = 'stopped'
break
# Get context (last 200 candles)
context = df.iloc[i-200:i]
current_candle = df.iloc[i]
current_time = current_candle.name
current_price = float(current_candle['close'])
# Make prediction
prediction = self._make_prediction(model, device, context, symbol, timeframe)
if prediction:
# Store prediction for display
pred_data = {
'timestamp': str(current_time),
'price': current_price,
'action': prediction['action'],
'confidence': prediction['confidence'],
'timeframe': timeframe,
'current_price': current_price
}
state['new_predictions'].append(pred_data)
# Store in orchestrator for visualization
if orchestrator and hasattr(orchestrator, 'store_transformer_prediction'):
# Determine model type from model class name
model_type = model.__class__.__name__.lower()
# Store in appropriate prediction collection
if 'transformer' in model_type:
orchestrator.store_transformer_prediction(symbol, {
'timestamp': current_time,
'current_price': current_price,
'predicted_price': current_price * (1.01 if prediction['action'] == 'BUY' else 0.99),
'price_change': 1.0 if prediction['action'] == 'BUY' else -1.0,
'confidence': prediction['confidence'],
'action': prediction['action'],
'horizon_minutes': 10
})
elif 'cnn' in model_type:
if hasattr(orchestrator, 'recent_cnn_predictions'):
if symbol not in orchestrator.recent_cnn_predictions:
from collections import deque
orchestrator.recent_cnn_predictions[symbol] = deque(maxlen=50)
orchestrator.recent_cnn_predictions[symbol].append({
'timestamp': current_time,
'current_price': current_price,
'predicted_price': current_price * (1.01 if prediction['action'] == 'BUY' else 0.99),
'confidence': prediction['confidence'],
'direction': 2 if prediction['action'] == 'BUY' else 0
})
elif 'dqn' in model_type or 'rl' in model_type:
if hasattr(orchestrator, 'recent_dqn_predictions'):
if symbol not in orchestrator.recent_dqn_predictions:
from collections import deque
orchestrator.recent_dqn_predictions[symbol] = deque(maxlen=100)
orchestrator.recent_dqn_predictions[symbol].append({
'timestamp': current_time,
'current_price': current_price,
'action': prediction['action'],
'confidence': prediction['confidence']
})
# Execute trade logic
self._execute_trade_logic(state, prediction, current_price, current_time)
# Update progress
state['candles_processed'] = i - min_context + 1
# Simulate real-time (optional, remove for faster backtest)
# time.sleep(0.01) # 10ms per candle
# Close any open position at end
if state['position']:
self._close_position(state, current_price, 'backtest_end')
# Calculate final stats
total_trades = state['total_trades']
wins = state['wins']
state['win_rate'] = wins / total_trades if total_trades > 0 else 0
state['status'] = 'complete'
logger.info(f"Backtest {backtest_id}: Complete. PnL=${state['pnl']:.2f}, Trades={total_trades}, Win Rate={state['win_rate']:.1%}")
except Exception as e:
logger.error(f"Backtest {backtest_id} error: {e}", exc_info=True)
state['status'] = 'error'
state['error'] = str(e)
def _make_prediction(self, model, device, context_df, symbol, timeframe):
"""Make model prediction on context data"""
try:
# Convert context to model input format
# Extract OHLCV data
candles = context_df[['open', 'high', 'low', 'close', 'volume']].values
# Normalize
candles_normalized = candles.copy()
price_data = candles[:, :4]
volume_data = candles[:, 4:5]
price_min = price_data.min()
price_max = price_data.max()
if price_max > price_min:
candles_normalized[:, :4] = (price_data - price_min) / (price_max - price_min)
volume_min = volume_data.min()
volume_max = volume_data.max()
if volume_max > volume_min:
candles_normalized[:, 4:5] = (volume_data - volume_min) / (volume_max - volume_min)
# Convert to tensor [1, 200, 5]
# Try GPU first, fallback to CPU if GPU fails
try:
price_tensor = torch.tensor(candles_normalized, dtype=torch.float32).unsqueeze(0).to(device)
tech_data = torch.zeros(1, 40, dtype=torch.float32).to(device)
market_data = torch.zeros(1, 30, dtype=torch.float32).to(device)
use_cpu = False
except Exception as gpu_error:
logger.warning(f"GPU tensor creation failed, using CPU: {gpu_error}")
device = torch.device('cpu')
model.to(device)
price_tensor = torch.tensor(candles_normalized, dtype=torch.float32).unsqueeze(0)
tech_data = torch.zeros(1, 40, dtype=torch.float32)
market_data = torch.zeros(1, 30, dtype=torch.float32)
use_cpu = True
# Make prediction
with torch.no_grad():
try:
outputs = model(
price_data_1m=price_tensor if timeframe == '1m' else None,
price_data_1s=price_tensor if timeframe == '1s' else None,
price_data_1h=price_tensor if timeframe == '1h' else None,
price_data_1d=price_tensor if timeframe == '1d' else None,
tech_data=tech_data,
market_data=market_data
)
except RuntimeError as model_error:
# GPU inference failed, retry on CPU
if not use_cpu and 'HIP' in str(model_error):
logger.warning(f"GPU inference failed, retrying on CPU: {model_error}")
device = torch.device('cpu')
model.to(device)
price_tensor = price_tensor.cpu()
tech_data = tech_data.cpu()
market_data = market_data.cpu()
outputs = model(
price_data_1m=price_tensor if timeframe == '1m' else None,
price_data_1s=price_tensor if timeframe == '1s' else None,
price_data_1h=price_tensor if timeframe == '1h' else None,
price_data_1d=price_tensor if timeframe == '1d' else None,
tech_data=tech_data,
market_data=market_data
)
else:
raise
# Get action prediction
action_probs = outputs.get('action_probs', outputs.get('trend_probs'))
if action_probs is not None:
action_idx = torch.argmax(action_probs, dim=-1).item()
confidence = action_probs[0, action_idx].item()
# Map to BUY/SELL/HOLD
actions = ['BUY', 'SELL', 'HOLD']
if action_idx < len(actions):
action = actions[action_idx]
else:
# If 4 actions (model has 4 trend directions), map to 3 actions
action = 'HOLD' if action_idx == 1 else ('BUY' if action_idx in [2, 3] else 'SELL')
return {
'action': action,
'confidence': confidence
}
return None
except Exception as e:
logger.error(f"Prediction error: {e}", exc_info=True)
return None
def _execute_trade_logic(self, state, prediction, current_price, current_time):
"""Execute trading logic based on prediction"""
action = prediction['action']
confidence = prediction['confidence']
# Only trade on high confidence
if confidence < 0.6:
return
position = state['position']
if action == 'BUY' and position is None:
# Enter long position
state['position'] = {
'type': 'long',
'entry_price': current_price,
'entry_time': current_time
}
logger.debug(f"Backtest: ENTER LONG @ ${current_price}")
elif action == 'SELL' and position is None:
# Enter short position
state['position'] = {
'type': 'short',
'entry_price': current_price,
'entry_time': current_time
}
logger.debug(f"Backtest: ENTER SHORT @ ${current_price}")
elif position is not None:
# Check if should exit
should_exit = False
if position['type'] == 'long' and action == 'SELL':
should_exit = True
elif position['type'] == 'short' and action == 'BUY':
should_exit = True
if should_exit:
self._close_position(state, current_price, 'signal')
def _close_position(self, state, exit_price, reason):
"""Close current position and update PnL"""
position = state['position']
if not position:
return
entry_price = position['entry_price']
# Calculate PnL
if position['type'] == 'long':
pnl = exit_price - entry_price
else: # short
pnl = entry_price - exit_price
# Update state
state['pnl'] += pnl
state['total_trades'] += 1
if pnl > 0:
state['wins'] += 1
elif pnl < 0:
state['losses'] += 1
logger.debug(f"Backtest: CLOSE {position['type'].upper()} @ ${exit_price:.2f}, PnL=${pnl:.2f} ({reason})")
state['position'] = None
def get_progress(self, backtest_id: str) -> Dict:
"""Get backtest progress"""
with self.lock:
state = self.active_backtests.get(backtest_id)
if not state:
return {'success': False, 'error': 'Backtest not found'}
# Get and clear new predictions (they'll be sent to frontend)
new_predictions = state['new_predictions']
state['new_predictions'] = []
return {
'success': True,
'status': state['status'],
'candles_processed': state['candles_processed'],
'total_candles': state['total_candles'],
'pnl': state['pnl'],
'total_trades': state['total_trades'],
'wins': state['wins'],
'losses': state['losses'],
'win_rate': state['wins'] / state['total_trades'] if state['total_trades'] > 0 else 0,
'new_predictions': new_predictions,
'error': state['error']
}
def stop_backtest(self, backtest_id: str):
"""Request backtest to stop"""
with self.lock:
state = self.active_backtests.get(backtest_id)
if state:
state['stop_requested'] = True
class AnnotationDashboard:
"""Main annotation dashboard application"""
def __init__(self):
"""Initialize the dashboard"""
# Load configuration
try:
# Always try YAML loading first since get_config might not work in standalone mode
import yaml
with open('config.yaml', 'r') as f:
self.config = yaml.safe_load(f)
logger.info(f"Loaded config via YAML: {len(self.config)} keys")
except Exception as e:
logger.warning(f"Could not load config via YAML: {e}")
try:
# Fallback to get_config if available
if get_config:
self.config = get_config()
logger.info(f"Loaded config via get_config: {len(self.config)} keys")
else:
raise Exception("get_config not available")
except Exception as e2:
logger.warning(f"Could not load config via get_config: {e2}")
# Final fallback config with SOL/USDT
self.config = {
'symbols': ['ETH/USDT', 'BTC/USDT', 'SOL/USDT'],
'timeframes': ['1s', '1m', '1h', '1d']
}
logger.info("Using fallback config")
# Initialize Flask app
self.server = Flask(
__name__,
template_folder='templates',
static_folder='static'
)
# Initialize SocketIO for WebSocket support
try:
from flask_socketio import SocketIO, emit
self.socketio = SocketIO(
self.server,
cors_allowed_origins="*",
async_mode='threading',
logger=False,
engineio_logger=False
)
self.has_socketio = True
logger.info("SocketIO initialized for real-time updates")
except ImportError:
self.socketio = None
self.has_socketio = False
logger.warning("flask-socketio not installed - live updates will use polling")
# Suppress werkzeug request logs (reduce noise from polling endpoints)
werkzeug_logger = logging.getLogger('werkzeug')
werkzeug_logger.setLevel(logging.WARNING) # Only show warnings and errors, not INFO
# Initialize Dash app (optional component)
self.app = Dash(
__name__,
server=self.server,
url_base_pathname='/dash/',
external_stylesheets=[
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css',
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
]
)
# Set a simple Dash layout to avoid NoLayoutException
self.app.layout = html.Div([
html.H1("ANNOTATE Dashboard", className="text-center mb-4"),
html.Div([
html.P("This is the Dash component of the ANNOTATE system."),
html.P("The main interface is available at the Flask routes."),
html.A("Go to Main Interface", href="/", className="btn btn-primary")
], className="container")
])
# Initialize core components (skip initial load for fast startup)
self.data_provider = DataProvider(skip_initial_load=True) if DataProvider else None
# Enable unified storage for real-time data access
if self.data_provider:
self._enable_unified_storage_async()
# ANNOTATE doesn't need orchestrator immediately - lazy load on demand
self.orchestrator = None
self.models_loading = False
self.available_models = ['DQN', 'CNN', 'Transformer'] # Models that CAN be loaded
self.loaded_models = {} # Models that ARE loaded: {name: model_instance}
# Initialize ANNOTATE components
self.annotation_manager = AnnotationManager()
# Use REAL training adapter - NO SIMULATION!
self.training_adapter = RealTrainingAdapter(None, self.data_provider)
# Backtest runner for replaying visible chart with predictions
self.backtest_runner = BacktestRunner()
# Don't auto-load models - wait for user to click LOAD button
logger.info("Models available for lazy loading: " + ", ".join(self.available_models))
# Initialize data loader with existing DataProvider
self.data_loader = HistoricalDataLoader(self.data_provider) if self.data_provider else None
self.time_range_manager = TimeRangeManager(self.data_loader) if self.data_loader else None
# Setup routes
self._setup_routes()
# Start background data refresh after startup
if self.data_loader:
self._start_background_data_refresh()
logger.info("Annotation Dashboard initialized")
def _get_best_checkpoint_info(self, model_name: str) -> Optional[Dict]:
"""
Get best checkpoint info for a model without loading it
First tries database, then falls back to filename parsing
Args:
model_name: Name of the model
Returns:
Dict with checkpoint info or None if no checkpoint found
"""
try:
# Try to get from database first (has full metadata)
try:
from utils.database_manager import get_database_manager
db_manager = get_database_manager()
# Get active checkpoint for this model
with db_manager._get_connection() as conn:
cursor = conn.execute("""
SELECT checkpoint_id, performance_metrics, timestamp, file_path
FROM checkpoint_metadata
WHERE model_name = ? AND is_active = TRUE
ORDER BY timestamp DESC
LIMIT 1
""", (model_name.lower(),))
row = cursor.fetchone()
if row:
import json
checkpoint_id, metrics_json, timestamp, file_path = row
metrics = json.loads(metrics_json) if metrics_json else {}
checkpoint_info = {
'filename': os.path.basename(file_path) if file_path else checkpoint_id,
'epoch': metrics.get('epoch', 0),
'loss': metrics.get('loss'),
'accuracy': metrics.get('accuracy'),
'source': 'database'
}
logger.info(f"Loaded checkpoint info from database for {model_name}: E{checkpoint_info['epoch']}, Loss={checkpoint_info['loss']}, Acc={checkpoint_info['accuracy']}")
return checkpoint_info
except Exception as db_error:
logger.debug(f"Could not load from database: {db_error}")
# Fallback to filename parsing
import glob
import re
# Map model names to checkpoint directories
checkpoint_dirs = {
'Transformer': 'models/checkpoints/transformer',
'CNN': 'models/checkpoints/enhanced_cnn',
'DQN': 'models/checkpoints/dqn_agent'
}
checkpoint_dir = checkpoint_dirs.get(model_name)
if not checkpoint_dir:
return None
if not os.path.exists(checkpoint_dir):
logger.debug(f"Checkpoint directory not found: {checkpoint_dir}")
return None
# Find all checkpoint files
checkpoint_files = glob.glob(os.path.join(checkpoint_dir, '*.pt'))
if not checkpoint_files:
logger.debug(f"No checkpoint files found in {checkpoint_dir}")
return None
logger.debug(f"Found {len(checkpoint_files)} checkpoints for {model_name}")
# Parse filenames to extract epoch info
# Format: transformer_epoch5_20251110_123620.pt
best_checkpoint = None
best_epoch = -1
for cp_file in checkpoint_files:
try:
filename = os.path.basename(cp_file)
# Extract epoch number from filename
match = re.search(r'epoch(\d+)', filename, re.IGNORECASE)
if match:
epoch = int(match.group(1))
if epoch > best_epoch:
best_epoch = epoch
best_checkpoint = {
'filename': filename,
'epoch': epoch,
'loss': None, # Can't get without loading
'accuracy': None, # Can't get without loading
'source': 'filename'
}
logger.debug(f"Found checkpoint: {filename}, epoch {epoch}")
except Exception as e:
logger.debug(f"Could not parse checkpoint {cp_file}: {e}")
continue
if best_checkpoint:
logger.info(f"Best checkpoint for {model_name}: {best_checkpoint['filename']} (E{best_checkpoint['epoch']})")
return best_checkpoint
except Exception as e:
logger.error(f"Error getting checkpoint info for {model_name}: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _load_model_lazy(self, model_name: str) -> dict:
"""
Lazy load a specific model on demand
Args:
model_name: Name of model to load ('DQN', 'CNN', 'Transformer')
Returns:
dict: Result with success status and message
"""
try:
# Check if already loaded
if model_name in self.loaded_models:
return {
'success': True,
'message': f'{model_name} already loaded',
'already_loaded': True
}
# Check if model is available
if model_name not in self.available_models:
return {
'success': False,
'error': f'{model_name} is not in available models list'
}
logger.info(f"Loading {model_name} model...")
# Initialize orchestrator if not already done
if not self.orchestrator:
if not TradingOrchestrator:
return {
'success': False,
'error': 'TradingOrchestrator class not available'
}
logger.info("Creating TradingOrchestrator instance...")
self.orchestrator = TradingOrchestrator(
data_provider=self.data_provider,
enhanced_rl_training=True
)
logger.info("Orchestrator created")
# Update training adapter
self.training_adapter.orchestrator = self.orchestrator
# Load specific model
if model_name == 'DQN':
if not hasattr(self.orchestrator, 'rl_agent') or not self.orchestrator.rl_agent:
# Initialize RL agent
self.orchestrator._initialize_rl_agent()
self.loaded_models['DQN'] = self.orchestrator.rl_agent
elif model_name == 'CNN':
if not hasattr(self.orchestrator, 'cnn_model') or not self.orchestrator.cnn_model:
# Initialize CNN model
self.orchestrator._initialize_cnn_model()
self.loaded_models['CNN'] = self.orchestrator.cnn_model
elif model_name == 'Transformer':
if not hasattr(self.orchestrator, 'primary_transformer') or not self.orchestrator.primary_transformer:
# Initialize Transformer model
self.orchestrator._initialize_transformer_model()
self.loaded_models['Transformer'] = self.orchestrator.primary_transformer
else:
return {
'success': False,
'error': f'Unknown model: {model_name}'
}
logger.info(f"{model_name} model loaded successfully")
return {
'success': True,
'message': f'{model_name} loaded successfully',
'loaded_models': list(self.loaded_models.keys())
}
except Exception as e:
logger.error(f"Error loading {model_name}: {e}")
import traceback
logger.error(f"Traceback:\n{traceback.format_exc()}")
return {
'success': False,
'error': str(e)
}
def _enable_unified_storage_async(self):
"""Enable unified storage system in background thread"""
def enable_storage():
try:
import asyncio
import threading
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Enable unified storage
success = loop.run_until_complete(
self.data_provider.enable_unified_storage()
)
if success:
logger.info(" ANNOTATE: Unified storage enabled for real-time data")
# Get statistics
stats = self.data_provider.get_unified_storage_stats()
if stats.get('initialized'):
logger.info(" Real-time data access: <10ms")
logger.info(" Historical data access: <100ms")
logger.info(" Annotation data: Available at any timestamp")
else:
logger.warning(" ANNOTATE: Unified storage not available, using cached data only")
except Exception as e:
logger.warning(f"ANNOTATE: Could not enable unified storage: {e}")
logger.info("ANNOTATE: Continuing with cached data access")
# Start in background thread
import threading
storage_thread = threading.Thread(target=enable_storage, daemon=True)
storage_thread.start()
def _start_background_data_refresh(self):
"""Start background task to refresh recent data after startup - ONCE ONLY"""
def refresh_recent_data():
try:
import time
# Wait for app to fully start
time.sleep(5)
logger.info(" Starting one-time background data refresh (fetching only recent missing data)")
# Disable startup mode to fetch fresh data
self.data_loader.disable_startup_mode()
# Use the new on-demand refresh method
logger.info("Using on-demand refresh for recent data")
self.data_provider.refresh_data_on_demand()
logger.info(" One-time background data refresh completed")
except Exception as e:
logger.error(f"Error in background data refresh: {e}")
# Start refresh in background thread
import threading
refresh_thread = threading.Thread(target=refresh_recent_data, daemon=True)
refresh_thread.start()
logger.info("One-time background data refresh scheduled")
def _get_pivot_markers_for_timeframe(self, symbol: str, timeframe: str, df: pd.DataFrame) -> dict:
"""
Get pivot markers for a specific timeframe using WilliamsMarketStructure directly
Returns dict with all pivot points and identifies which are the last high/low per level
"""
try:
if WilliamsMarketStructure is None:
logger.warning("WilliamsMarketStructure not available")
return {}
if df is None or len(df) < 10:
logger.warning(f"Insufficient data for pivot calculation: {len(df) if df is not None else 0} bars")
return {}
# Convert DataFrame to numpy array format expected by Williams Market Structure
ohlcv_array = df[['open', 'high', 'low', 'close', 'volume']].copy()
# Add timestamp as first column (convert to milliseconds)
timestamps = df.index.astype(np.int64) // 10**6 # pandas index is ns -> convert to ms
ohlcv_array.insert(0, 'timestamp', timestamps)
ohlcv_array = ohlcv_array.to_numpy()
# Initialize Williams Market Structure with default distance
# We'll override it in the calculation call
williams = WilliamsMarketStructure(min_pivot_distance=1)
# Calculate recursive pivot points with min_pivot_distance=2
# This ensures 5 candles per pivot (tip + 2 prev + 2 next)
pivot_levels = williams.calculate_recursive_pivot_points(
ohlcv_array,
min_pivot_distance=2
)
if not pivot_levels:
logger.debug(f"No pivot levels found for {symbol} {timeframe}")
return {}
# Build a map of timestamp -> pivot info
# Also track last high/low per level for drawing horizontal lines
pivot_map = {}
last_pivots = {} # {level: {'high': (ts_str, idx), 'low': (ts_str, idx)}}
# For each level (1-5), collect ALL pivot points
for level_num, trend_level in pivot_levels.items():
if not hasattr(trend_level, 'pivot_points') or not trend_level.pivot_points:
continue
last_pivots[level_num] = {'high': None, 'low': None}
# Add ALL pivot points to the map
for pivot in trend_level.pivot_points:
ts_str = pivot.timestamp.strftime('%Y-%m-%d %H:%M:%S')
if ts_str not in pivot_map:
pivot_map[ts_str] = {'highs': [], 'lows': []}
pivot_info = {
'level': level_num,
'price': pivot.price,
'strength': pivot.strength,
'is_last': False # Will be updated below
}
if pivot.pivot_type == 'high':
pivot_map[ts_str]['highs'].append(pivot_info)
last_pivots[level_num]['high'] = (ts_str, len(pivot_map[ts_str]['highs']) - 1)
elif pivot.pivot_type == 'low':
pivot_map[ts_str]['lows'].append(pivot_info)
last_pivots[level_num]['low'] = (ts_str, len(pivot_map[ts_str]['lows']) - 1)
# Mark the last high and last low for each level
for level_num, last_info in last_pivots.items():
if last_info['high']:
ts_str, idx = last_info['high']
pivot_map[ts_str]['highs'][idx]['is_last'] = True
if last_info['low']:
ts_str, idx = last_info['low']
pivot_map[ts_str]['lows'][idx]['is_last'] = True
logger.info(f"Found {len(pivot_map)} pivot candles for {symbol} {timeframe} (from {len(df)} candles)")
return pivot_map
except Exception as e:
logger.error(f"Error getting pivot markers for {timeframe}: {e}")
import traceback
logger.error(traceback.format_exc())
return {}
def _setup_routes(self):
"""Setup Flask routes"""
@self.server.route('/favicon.ico')
def favicon():
"""Serve favicon to prevent 404 errors"""
from flask import Response
# Return a simple 1x1 transparent pixel as favicon
favicon_data = b'\x00\x00\x01\x00\x01\x00\x10\x10\x00\x00\x01\x00\x20\x00\x68\x04\x00\x00\x16\x00\x00\x00\x28\x00\x00\x00\x10\x00\x00\x00\x20\x00\x00\x00\x01\x00\x20\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
return Response(favicon_data, mimetype='image/x-icon')
@self.server.route('/')
def index():
"""Main dashboard page - loads existing annotations"""
try:
# Get symbols and timeframes from config
symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT'])
timeframes = self.config.get('timeframes', ['1s', '1m', '1h', '1d'])
current_symbol = symbols[0] if symbols else 'ETH/USDT'
# Get annotations filtered by current symbol
annotations = self.annotation_manager.get_annotations(symbol=current_symbol)
# Convert to serializable format
annotations_data = []
for ann in annotations:
if hasattr(ann, '__dict__'):
ann_dict = ann.__dict__
else:
ann_dict = ann
# Ensure all fields are JSON serializable
annotations_data.append({
'annotation_id': ann_dict.get('annotation_id'),
'symbol': ann_dict.get('symbol'),
'timeframe': ann_dict.get('timeframe'),
'entry': ann_dict.get('entry'),
'exit': ann_dict.get('exit'),
'direction': ann_dict.get('direction'),
'profit_loss_pct': ann_dict.get('profit_loss_pct'),
'notes': ann_dict.get('notes', ''),
'created_at': ann_dict.get('created_at')
})
logger.info(f"Loading dashboard with {len(annotations_data)} annotations for {current_symbol}")
# Prepare template data
template_data = {
'current_symbol': current_symbol,
'symbols': symbols,
'timeframes': timeframes,
'annotations': annotations_data
}
return render_template('annotation_dashboard.html', **template_data)
except Exception as e:
logger.error(f"Error rendering main page: {e}")
# Fallback simple HTML page
return f"""
ANNOTATE - Manual Trade Annotation UI
📝 ANNOTATE - Manual Trade Annotation UI
System Status
Annotation Manager: Active
Data Provider: {'Available' if self.data_provider else 'Not Available (Standalone Mode)'}
Trading Orchestrator: {'Available' if self.orchestrator else 'Not Available (Standalone Mode)'}