anotation system operational

This commit is contained in:
Dobromir Popov
2025-10-18 18:41:58 +03:00
parent 3d91cb0e8f
commit 38d6a01f8e
12 changed files with 1996 additions and 116 deletions

View File

@@ -84,7 +84,9 @@ class AnnotationManager:
raise
def create_annotation(self, entry_point: Dict, exit_point: Dict,
symbol: str, timeframe: str) -> TradeAnnotation:
symbol: str, timeframe: str,
entry_market_state: Dict = None,
exit_market_state: Dict = None) -> TradeAnnotation:
"""Create new trade annotation"""
# Calculate direction and P&L
entry_price = entry_point['price']
@@ -161,38 +163,82 @@ class AnnotationManager:
else:
logger.warning(f"Annotation not found: {annotation_id}")
def generate_test_case(self, annotation: TradeAnnotation) -> Dict:
"""Generate test case from annotation in realtime format"""
# This will be populated with actual market data in Task 2
def generate_test_case(self, annotation: TradeAnnotation, data_provider=None) -> Dict:
"""
Generate test case from annotation in realtime format
Args:
annotation: TradeAnnotation object
data_provider: Optional DataProvider instance to fetch market context
Returns:
Test case dictionary in realtime format
"""
test_case = {
"test_case_id": f"annotation_{annotation.annotation_id}",
"symbol": annotation.symbol,
"timestamp": annotation.entry['timestamp'],
"action": "BUY" if annotation.direction == "LONG" else "SELL",
"market_state": {
# Will be populated with BaseDataInput structure
"ohlcv_1s": [],
"ohlcv_1m": [],
"ohlcv_1h": [],
"ohlcv_1d": [],
"cob_data": {},
"technical_indicators": {},
"pivot_points": []
},
"market_state": {},
"expected_outcome": {
"direction": annotation.direction,
"profit_loss_pct": annotation.profit_loss_pct,
"holding_period_seconds": self._calculate_holding_period(annotation),
"exit_price": annotation.exit['price']
"exit_price": annotation.exit['price'],
"entry_price": annotation.entry['price']
},
"annotation_metadata": {
"annotator": "manual",
"confidence": 1.0,
"notes": annotation.notes,
"created_at": annotation.created_at
"created_at": annotation.created_at,
"timeframe": annotation.timeframe
}
}
# Populate market state if data_provider is available
if data_provider and annotation.market_context:
test_case["market_state"] = annotation.market_context
elif data_provider:
# Fetch market state at entry time
try:
entry_time = datetime.fromisoformat(annotation.entry['timestamp'].replace('Z', '+00:00'))
# Fetch OHLCV data for all timeframes
timeframes = ['1s', '1m', '1h', '1d']
market_state = {}
for tf in timeframes:
df = data_provider.get_historical_data(
symbol=annotation.symbol,
timeframe=tf,
limit=100
)
if df is not None and not df.empty:
# Filter to data before entry time
df = df[df.index <= entry_time]
if not df.empty:
# Convert to list format
market_state[f'ohlcv_{tf}'] = {
'timestamps': df.index.strftime('%Y-%m-%d %H:%M:%S').tolist(),
'open': df['open'].tolist(),
'high': df['high'].tolist(),
'low': df['low'].tolist(),
'close': df['close'].tolist(),
'volume': df['volume'].tolist()
}
test_case["market_state"] = market_state
logger.info(f"Populated market state with {len(market_state)} timeframes")
except Exception as e:
logger.error(f"Error fetching market state: {e}")
test_case["market_state"] = {}
else:
test_case["market_state"] = {}
# Save test case to file
test_case_file = self.test_cases_dir / f"{test_case['test_case_id']}.json"
with open(test_case_file, 'w') as f:

View File

@@ -0,0 +1,292 @@
"""
Historical Data Loader - Integrates with existing DataProvider
Provides data loading and caching for the annotation UI, ensuring the same
data quality and structure used by training and inference systems.
"""
import logging
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
import pandas as pd
from pathlib import Path
import pickle
logger = logging.getLogger(__name__)
class HistoricalDataLoader:
"""
Loads historical data from the main system's DataProvider
Ensures consistency with training/inference data
"""
def __init__(self, data_provider):
"""
Initialize with existing DataProvider
Args:
data_provider: Instance of core.data_provider.DataProvider
"""
self.data_provider = data_provider
self.cache_dir = Path("ANNOTATE/data/cache")
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Cache for recently loaded data
self.memory_cache = {}
self.cache_ttl = timedelta(minutes=5)
logger.info("HistoricalDataLoader initialized with existing DataProvider")
def get_data(self, symbol: str, timeframe: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 500) -> Optional[pd.DataFrame]:
"""
Get historical data for symbol and timeframe
Args:
symbol: Trading pair (e.g., 'ETH/USDT')
timeframe: Timeframe (e.g., '1s', '1m', '1h', '1d')
start_time: Start time for data range
end_time: End time for data range
limit: Maximum number of candles to return
Returns:
DataFrame with OHLCV data or None if unavailable
"""
# Check memory cache first
cache_key = f"{symbol}_{timeframe}_{start_time}_{end_time}_{limit}"
if cache_key in self.memory_cache:
cached_data, cached_time = self.memory_cache[cache_key]
if datetime.now() - cached_time < self.cache_ttl:
logger.debug(f"Returning cached data for {symbol} {timeframe}")
return cached_data
try:
# Use DataProvider's cached data if available
if hasattr(self.data_provider, 'cached_data'):
if symbol in self.data_provider.cached_data:
if timeframe in self.data_provider.cached_data[symbol]:
df = self.data_provider.cached_data[symbol][timeframe]
if df is not None and not df.empty:
# Filter by time range if specified
if start_time or end_time:
df = self._filter_by_time_range(df, start_time, end_time)
# Limit number of candles
if len(df) > limit:
df = df.tail(limit)
# Cache in memory
self.memory_cache[cache_key] = (df.copy(), datetime.now())
logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}")
return df
# Fallback: fetch from DataProvider's historical data method
logger.info(f"Fetching fresh data for {symbol} {timeframe}")
df = self.data_provider.get_historical_data(
symbol=symbol,
timeframe=timeframe,
limit=limit
)
if df is not None and not df.empty:
# Filter by time range if specified
if start_time or end_time:
df = self._filter_by_time_range(df, start_time, end_time)
# Cache in memory
self.memory_cache[cache_key] = (df.copy(), datetime.now())
logger.info(f"Fetched {len(df)} candles for {symbol} {timeframe}")
return df
logger.warning(f"No data available for {symbol} {timeframe}")
return None
except Exception as e:
logger.error(f"Error loading data for {symbol} {timeframe}: {e}")
return None
def _filter_by_time_range(self, df: pd.DataFrame,
start_time: Optional[datetime],
end_time: Optional[datetime]) -> pd.DataFrame:
"""Filter DataFrame by time range"""
if start_time:
df = df[df.index >= start_time]
if end_time:
df = df[df.index <= end_time]
return df
def get_multi_timeframe_data(self, symbol: str,
timeframes: List[str],
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
limit: int = 500) -> Dict[str, pd.DataFrame]:
"""
Get data for multiple timeframes at once
Args:
symbol: Trading pair
timeframes: List of timeframes
start_time: Start time for data range
end_time: End time for data range
limit: Maximum number of candles per timeframe
Returns:
Dictionary mapping timeframe to DataFrame
"""
result = {}
for timeframe in timeframes:
df = self.get_data(
symbol=symbol,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
limit=limit
)
if df is not None:
result[timeframe] = df
logger.info(f"Loaded data for {len(result)}/{len(timeframes)} timeframes")
return result
def prefetch_data(self, symbol: str, timeframes: List[str], limit: int = 1000):
"""
Prefetch data for smooth scrolling
Args:
symbol: Trading pair
timeframes: List of timeframes to prefetch
limit: Number of candles to prefetch
"""
logger.info(f"Prefetching data for {symbol}: {timeframes}")
for timeframe in timeframes:
self.get_data(symbol, timeframe, limit=limit)
def clear_cache(self):
"""Clear memory cache"""
self.memory_cache.clear()
logger.info("Memory cache cleared")
def get_data_boundaries(self, symbol: str, timeframe: str) -> Tuple[Optional[datetime], Optional[datetime]]:
"""
Get the earliest and latest available data timestamps
Args:
symbol: Trading pair
timeframe: Timeframe
Returns:
Tuple of (earliest_time, latest_time) or (None, None) if no data
"""
try:
df = self.get_data(symbol, timeframe, limit=10000)
if df is not None and not df.empty:
return (df.index.min(), df.index.max())
return (None, None)
except Exception as e:
logger.error(f"Error getting data boundaries: {e}")
return (None, None)
class TimeRangeManager:
"""Manages time range calculations and data prefetching"""
def __init__(self, data_loader: HistoricalDataLoader):
"""
Initialize with data loader
Args:
data_loader: HistoricalDataLoader instance
"""
self.data_loader = data_loader
# Time range presets in seconds
self.range_presets = {
'1h': 3600,
'4h': 14400,
'1d': 86400,
'1w': 604800,
'1M': 2592000
}
logger.info("TimeRangeManager initialized")
def calculate_time_range(self, center_time: datetime,
range_preset: str) -> Tuple[datetime, datetime]:
"""
Calculate start and end times for a range preset
Args:
center_time: Center point of the range
range_preset: Range preset ('1h', '4h', '1d', '1w', '1M')
Returns:
Tuple of (start_time, end_time)
"""
range_seconds = self.range_presets.get(range_preset, 86400)
half_range = timedelta(seconds=range_seconds / 2)
start_time = center_time - half_range
end_time = center_time + half_range
return (start_time, end_time)
def get_navigation_increment(self, range_preset: str) -> timedelta:
"""
Get time increment for navigation (10% of range)
Args:
range_preset: Range preset
Returns:
timedelta for navigation increment
"""
range_seconds = self.range_presets.get(range_preset, 86400)
increment_seconds = range_seconds / 10
return timedelta(seconds=increment_seconds)
def prefetch_adjacent_ranges(self, symbol: str, timeframes: List[str],
center_time: datetime, range_preset: str):
"""
Prefetch data for adjacent time ranges for smooth scrolling
Args:
symbol: Trading pair
timeframes: List of timeframes
center_time: Current center time
range_preset: Current range preset
"""
increment = self.get_navigation_increment(range_preset)
# Prefetch previous range
prev_center = center_time - increment
prev_start, prev_end = self.calculate_time_range(prev_center, range_preset)
# Prefetch next range
next_center = center_time + increment
next_start, next_end = self.calculate_time_range(next_center, range_preset)
logger.debug(f"Prefetching adjacent ranges for {symbol}")
# Prefetch in background (non-blocking)
import threading
def prefetch():
for timeframe in timeframes:
self.data_loader.get_data(symbol, timeframe, prev_start, prev_end)
self.data_loader.get_data(symbol, timeframe, next_start, next_end)
thread = threading.Thread(target=prefetch, daemon=True)
thread.start()

View File

@@ -62,16 +62,56 @@ class TrainingSimulator:
def load_model(self, model_name: str):
"""Load model from orchestrator"""
if model_name in self.model_cache:
logger.info(f"Using cached model: {model_name}")
return self.model_cache[model_name]
if not self.orchestrator:
logger.error("Orchestrator not available")
return None
# Get model from orchestrator
# This will be implemented when we integrate with actual models
logger.info(f"Loading model: {model_name}")
return None
try:
# Get model from orchestrator based on name
model = None
if model_name == "StandardizedCNN" or model_name == "CNN":
model = self.orchestrator.cnn_model
elif model_name == "DQN":
model = self.orchestrator.rl_agent
elif model_name == "Transformer":
model = self.orchestrator.primary_transformer
elif model_name == "COB":
model = self.orchestrator.cob_rl_agent
if model:
self.model_cache[model_name] = model
logger.info(f"Loaded model: {model_name}")
return model
else:
logger.warning(f"Model not found: {model_name}")
return None
except Exception as e:
logger.error(f"Error loading model {model_name}: {e}")
return None
def get_available_models(self) -> List[str]:
"""Get list of available models from orchestrator"""
if not self.orchestrator:
return []
available = []
if self.orchestrator.cnn_model:
available.append("StandardizedCNN")
if self.orchestrator.rl_agent:
available.append("DQN")
if self.orchestrator.primary_transformer:
available.append("Transformer")
if self.orchestrator.cob_rl_agent:
available.append("COB")
logger.info(f"Available models: {available}")
return available
def start_training(self, model_name: str, test_cases: List[Dict]) -> str:
"""Start training session with test cases"""