wip symbols tidy up

This commit is contained in:
Dobromir Popov
2025-07-24 16:08:58 +03:00
parent d17af5ca4b
commit 045780758a
4 changed files with 184 additions and 58 deletions

View File

@ -74,6 +74,16 @@ Based on the existing implementation in `core/data_provider.py`, we'll enhance i
- 1,5,15 and 60s MA of the COB imbalance counting +- 5 COB buckets
- ***OUTPUTS***: suggested trade action (BUY/SELL)
# Standardized input for all models:
{
'primary_symbol': 'ETH/USDT',
'reference_symbol': 'BTC/USDT',
'eth_data': {'ETH_1s': df, 'ETH_1m': df, 'ETH_1h': df, 'ETH_1d': df},
'btc_data': {'BTC_1s': df},
'current_prices': {'ETH': price, 'BTC': price},
'data_completeness': {...}
}
### 2. CNN Model
The CNN Model is responsible for analyzing patterns in market data and predicting pivot points across multiple timeframes.

View File

@ -197,7 +197,8 @@
- Ensure validation occurs before any model inference
- _Requirements: 9.1, 9.4_
- [ ] 5.2. Implement persistent inference history storage
- [x] 5.2. Implement persistent inference history storage
- Create InferenceHistoryStore class for persistent storage
- Store complete input data packages with each prediction
- Include timestamp, symbol, input features, prediction outputs, confidence scores

View File

@ -110,7 +110,9 @@ class TradingOrchestrator:
self.confidence_threshold_close = self.config.orchestrator.get('confidence_threshold_close', 0.08) # Lowered from 0.10
# Decision frequency limit to prevent excessive trading
self.decision_frequency = self.config.orchestrator.get('decision_frequency', 30)
self.symbols = self.config.get('symbols', ['ETH/USDT', 'BTC/USDT']) # Enhanced to support multiple symbols
self.symbol = self.config.get('symbol', "ETH/USDT") # main symbol we wre trading and making predictions on. only one!
self.ref_symbols = self.config.get('ref_symbols', [ 'BTC/USDT']) # Enhanced to support multiple reference symbols. ToDo: we can add 'SOL/USDT' later
# NEW: Aggressiveness parameters
self.entry_aggressiveness = self.config.orchestrator.get('entry_aggressiveness', 0.5) # 0.0 = conservative, 1.0 = very aggressive
@ -153,12 +155,11 @@ class TradingOrchestrator:
self.recent_cnn_predictions: Dict[str, deque] = {} # {symbol: List[Dict]} - Recent CNN predictions
self.prediction_accuracy_history: Dict[str, deque] = {} # {symbol: List[Dict]} - Prediction accuracy tracking
# Initialize prediction tracking for each symbol
for symbol in self.symbols:
self.recent_dqn_predictions[symbol] = deque(maxlen=100)
self.recent_cnn_predictions[symbol] = deque(maxlen=50)
self.prediction_accuracy_history[symbol] = deque(maxlen=200)
self.signal_accumulator[symbol] = []
# Initialize prediction tracking for the primary trading symbol only
self.recent_dqn_predictions[self.symbol] = deque(maxlen=100)
self.recent_cnn_predictions[self.symbol] = deque(maxlen=50)
self.prediction_accuracy_history[self.symbol] = deque(maxlen=200)
self.signal_accumulator[self.symbol] = []
# Decision callbacks
self.decision_callbacks: List[Any] = []
@ -177,7 +178,7 @@ class TradingOrchestrator:
self.latest_cob_data: Dict[str, Any] = {} # {symbol: COBSnapshot}
self.latest_cob_features: Dict[str, Any] = {} # {symbol: np.ndarray} - CNN features
self.latest_cob_state: Dict[str, Any] = {} # {symbol: np.ndarray} - DQN state features
self.cob_feature_history: Dict[str, List[Any]] = {symbol: [] for symbol in self.symbols} # Rolling history for models
self.cob_feature_history: Dict[str, List[Any]] = {self.symbol: []} # Rolling history for primary trading symbol
# Enhanced ML Models
self.rl_agent: Any = None # DQN Agent
@ -204,13 +205,13 @@ class TradingOrchestrator:
# Training tracking
self.last_trained_symbols: Dict[str, datetime] = {}
# INFERENCE DATA STORAGE - Store model inputs and outputs for training
self.inference_history: Dict[str, deque] = {} # {symbol: deque of inference records}
self.max_inference_history = 1000 # Keep last 1000 inference records per symbol
# INFERENCE DATA STORAGE - Per-model storage with memory optimization
self.inference_history: Dict[str, deque] = {} # {model_name: deque of last 5 inference records}
self.max_memory_inferences = 5 # Keep only last 5 inferences in memory per model
self.max_disk_files_per_model = 200 # Cap disk files per model
# Initialize inference history for each symbol
for symbol in self.symbols:
self.inference_history[symbol] = deque(maxlen=self.max_inference_history)
# Initialize inference history for each model (will be populated as models make predictions)
# We'll create entries dynamically as models are used
# ENHANCED: Real-time Training System Integration
self.enhanced_training_system = None # Will be set to EnhancedRealtimeTrainingSystem if available
@ -1071,9 +1072,9 @@ class TradingOrchestrator:
# Store input data for generic model
model_input = input_data.get('generic_input')
# Store inference data for training
# Store inference data for training (per-model, async)
if prediction and model_input is not None:
self._store_inference_data(symbol, model_name, model_input, prediction, current_time)
await self._store_inference_data_async(model_name, model_input, prediction, current_time)
except Exception as e:
logger.error(f"Error getting prediction from {model_name}: {e}")
@ -1085,61 +1086,134 @@ class TradingOrchestrator:
return predictions
async def _collect_model_input_data(self, symbol: str) -> Dict[str, Any]:
"""Collect comprehensive input data for all models"""
"""Collect standardized input data for all models - ETH primary + BTC reference"""
try:
input_data = {}
# Only collect data for ETH (primary symbol) - we inference only for ETH
if symbol != 'ETH/USDT':
return {}
# Get current market data from data provider
current_price = self.data_provider.get_current_price(symbol)
# Standardized input: 4 ETH timeframes + 1s BTC reference
eth_data = {}
eth_timeframes = ['1s', '1m', '1h', '1d']
# Collect OHLCV data for multiple timeframes
ohlcv_data = {}
timeframes = ['1s', '1m', '1h', '1d']
for tf in timeframes:
df = self.data_provider.get_historical_data(symbol, tf, limit=300)
# Collect ETH data for all timeframes
for tf in eth_timeframes:
df = self.data_provider.get_historical_data('ETH/USDT', tf, limit=300)
if df is not None and not df.empty:
ohlcv_data[tf] = df
eth_data[f'ETH_{tf}'] = df
# Collect COB data if available
cob_data = self.get_cob_snapshot(symbol)
# Collect BTC 1s reference data
btc_1s = self.data_provider.get_historical_data('BTC/USDT', '1s', limit=300)
btc_data = {}
if btc_1s is not None and not btc_1s.empty:
btc_data['BTC_1s'] = btc_1s
# Collect technical indicators
technical_indicators = {}
if '1h' in ohlcv_data:
df = ohlcv_data['1h']
if len(df) > 20:
technical_indicators['sma_20'] = df['close'].rolling(20).mean().iloc[-1]
technical_indicators['rsi'] = self._calculate_rsi(df['close'])
# Get current prices
eth_price = self.data_provider.get_current_price('ETH/USDT')
btc_price = self.data_provider.get_current_price('BTC/USDT')
# Prepare CNN input
cnn_input = self._prepare_cnn_input_data(ohlcv_data, cob_data, technical_indicators)
# Prepare RL input
rl_input = self._prepare_rl_input_data(ohlcv_data, cob_data, technical_indicators)
# Prepare generic input
generic_input = {
'symbol': symbol,
'current_price': current_price,
'ohlcv_data': ohlcv_data,
'cob_data': cob_data,
'technical_indicators': technical_indicators
}
input_data = {
'cnn_input': cnn_input,
'rl_input': rl_input,
'generic_input': generic_input,
# Create standardized input package
standardized_input = {
'timestamp': datetime.now(),
'symbol': symbol
'primary_symbol': 'ETH/USDT',
'reference_symbol': 'BTC/USDT',
'eth_data': eth_data,
'btc_data': btc_data,
'current_prices': {
'ETH': eth_price,
'BTC': btc_price
},
'data_completeness': {
'eth_timeframes': len(eth_data),
'btc_reference': len(btc_data),
'total_expected': 5 # 4 ETH + 1 BTC
}
}
return input_data
return standardized_input
except Exception as e:
logger.error(f"Error collecting model input data for {symbol}: {e}")
logger.error(f"Error collecting standardized model input data: {e}")
return {}
async def _store_inference_data_async(self, model_name: str, model_input: Any, prediction: Prediction, timestamp: datetime):
"""Store inference data per-model with async file operations and memory optimization"""
try:
# Create comprehensive inference record
inference_record = {
'timestamp': timestamp.isoformat(),
'model_name': model_name,
'model_input': model_input,
'prediction': {
'action': prediction.action,
'confidence': prediction.confidence,
'probabilities': prediction.probabilities,
'timeframe': prediction.timeframe
},
'metadata': prediction.metadata or {}
}
# Store in memory (only last 5 per model)
if model_name not in self.inference_history:
self.inference_history[model_name] = deque(maxlen=self.max_memory_inferences)
self.inference_history[model_name].append(inference_record)
# Async file storage (don't wait for completion)
asyncio.create_task(self._save_inference_to_disk_async(model_name, inference_record))
logger.debug(f"Stored inference data for {model_name} (memory: {len(self.inference_history[model_name])}/5)")
except Exception as e:
logger.error(f"Error storing inference data for {model_name}: {e}")
async def _save_inference_to_disk_async(self, model_name: str, inference_record: Dict):
"""Async save inference record to disk with file capping"""
try:
# Create model-specific directory
model_dir = Path(f"training_data/inference_history/{model_name}")
model_dir.mkdir(parents=True, exist_ok=True)
# Create filename with timestamp
timestamp_str = datetime.fromisoformat(inference_record['timestamp']).strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = f"inference_{timestamp_str}.json"
filepath = model_dir / filename
# Convert to JSON-serializable format
serializable_record = self._make_json_serializable(inference_record)
# Save to file
with open(filepath, 'w') as f:
json.dump(serializable_record, f, indent=2)
# Cap files per model (keep only latest 200)
await self._cap_model_files(model_dir)
logger.debug(f"Saved inference record to disk: {filepath}")
except Exception as e:
logger.error(f"Error saving inference to disk for {model_name}: {e}")
async def _cap_model_files(self, model_dir: Path):
"""Cap the number of files per model to max_disk_files_per_model"""
try:
# Get all inference files
files = list(model_dir.glob("inference_*.json"))
if len(files) > self.max_disk_files_per_model:
# Sort by modification time (oldest first)
files.sort(key=lambda x: x.stat().st_mtime)
# Remove oldest files
files_to_remove = files[:-self.max_disk_files_per_model]
for file_path in files_to_remove:
file_path.unlink()
logger.debug(f"Removed {len(files_to_remove)} old inference files from {model_dir.name}")
except Exception as e:
logger.error(f"Error capping model files in {model_dir}: {e}")
def _prepare_cnn_input_data(self, ohlcv_data: Dict, cob_data: Any, technical_indicators: Dict) -> np.ndarray:
"""Prepare standardized input data for CNN models"""
try:
@ -1350,6 +1424,35 @@ class TradingOrchestrator:
logger.error(f"Error loading inference history from disk: {e}")
return []
async def load_model_inference_history(self, model_name: str, limit: int = 50) -> List[Dict]:
"""Load inference history for a specific model from disk"""
try:
model_dir = Path(f"training_data/inference_history/{model_name}")
if not model_dir.exists():
return []
# Get all inference files
files = list(model_dir.glob("inference_*.json"))
files.sort(key=lambda x: x.stat().st_mtime, reverse=True) # Newest first
# Load up to 'limit' files
inference_records = []
for filepath in files[:limit]:
try:
with open(filepath, 'r') as f:
record = json.load(f)
inference_records.append(record)
except Exception as e:
logger.warning(f"Error loading inference file {filepath}: {e}")
continue
logger.info(f"Loaded {len(inference_records)} inference records for {model_name}")
return inference_records
except Exception as e:
logger.error(f"Error loading model inference history for {model_name}: {e}")
return []
def get_model_training_data(self, model_name: str, symbol: str = None) -> List[Dict]:
"""Get training data for a specific model"""
try:

View File

@ -16,6 +16,18 @@ ETH:
300s of 1s OHLCV data (5 min)
300 OHLCV + indicatros bars of each 1m 1h 1d and 1s BTC
so:
# Standardized input for all models:
{
'primary_symbol': 'ETH/USDT',
'reference_symbol': 'BTC/USDT',
'eth_data': {'ETH_1s': df, 'ETH_1m': df, 'ETH_1h': df, 'ETH_1d': df},
'btc_data': {'BTC_1s': df},
'current_prices': {'ETH': price, 'BTC': price},
'data_completeness': {...}
}
RL model should have also access of the last hidden layers of the CNN model where patterns are learned. it can be empty if CNN model is not active or missing. as well as the output (predictions) of the CNN model for each timeframe (1s 1m 1h 1d) and next expected pivot point
## CNN model