wip, training still disabled
This commit is contained in:
@ -199,6 +199,7 @@
|
|||||||
|
|
||||||
- [x] 5.2. Implement persistent inference history storage
|
- [x] 5.2. Implement persistent inference history storage
|
||||||
|
|
||||||
|
|
||||||
- Create InferenceHistoryStore class for persistent storage
|
- Create InferenceHistoryStore class for persistent storage
|
||||||
- Store complete input data packages with each prediction
|
- Store complete input data packages with each prediction
|
||||||
- Include timestamp, symbol, input features, prediction outputs, confidence scores
|
- Include timestamp, symbol, input features, prediction outputs, confidence scores
|
||||||
|
@ -224,7 +224,7 @@ class TradingOrchestrator:
|
|||||||
logger.info(f"Training enabled: {self.training_enabled}")
|
logger.info(f"Training enabled: {self.training_enabled}")
|
||||||
logger.info(f"Confidence threshold: {self.confidence_threshold}")
|
logger.info(f"Confidence threshold: {self.confidence_threshold}")
|
||||||
# logger.info(f"Decision frequency: {self.decision_frequency}s")
|
# logger.info(f"Decision frequency: {self.decision_frequency}s")
|
||||||
logger.info(f"Symbols: {self.symbols}")
|
logger.info(f"Primary symbol: {self.symbol}, Reference symbols: {self.ref_symbols}")
|
||||||
logger.info("Universal Data Adapter integrated for centralized data flow")
|
logger.info("Universal Data Adapter integrated for centralized data flow")
|
||||||
|
|
||||||
# Start centralized data collection for all models and dashboard
|
# Start centralized data collection for all models and dashboard
|
||||||
@ -366,7 +366,7 @@ class TradingOrchestrator:
|
|||||||
from core.extrema_trainer import ExtremaTrainer
|
from core.extrema_trainer import ExtremaTrainer
|
||||||
self.extrema_trainer = ExtremaTrainer(
|
self.extrema_trainer = ExtremaTrainer(
|
||||||
data_provider=self.data_provider,
|
data_provider=self.data_provider,
|
||||||
symbols=self.symbols
|
symbols=[self.symbol] # Only primary trading symbol
|
||||||
)
|
)
|
||||||
|
|
||||||
# Load checkpoint and capture initial state
|
# Load checkpoint and capture initial state
|
||||||
@ -618,7 +618,7 @@ class TradingOrchestrator:
|
|||||||
async def start_continuous_trading(self, symbols: Optional[List[str]] = None):
|
async def start_continuous_trading(self, symbols: Optional[List[str]] = None):
|
||||||
"""Start the continuous trading loop, using a decision model and trading executor"""
|
"""Start the continuous trading loop, using a decision model and trading executor"""
|
||||||
if symbols is None:
|
if symbols is None:
|
||||||
symbols = self.symbols
|
symbols = [self.symbol] # Only trade the primary symbol
|
||||||
|
|
||||||
if not self.realtime_processing_task:
|
if not self.realtime_processing_task:
|
||||||
self.realtime_processing_task = asyncio.create_task(self._trading_decision_loop())
|
self.realtime_processing_task = asyncio.create_task(self._trading_decision_loop())
|
||||||
@ -639,9 +639,9 @@ class TradingOrchestrator:
|
|||||||
logger.info("Trading decision loop started")
|
logger.info("Trading decision loop started")
|
||||||
while self.running:
|
while self.running:
|
||||||
try:
|
try:
|
||||||
for symbol in self.symbols:
|
# Only make decisions for the primary trading symbol
|
||||||
await self.make_trading_decision(symbol)
|
await self.make_trading_decision(self.symbol)
|
||||||
await asyncio.sleep(1) # Small delay between symbols
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
await asyncio.sleep(self.decision_frequency)
|
await asyncio.sleep(self.decision_frequency)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -768,7 +768,7 @@ class TradingOrchestrator:
|
|||||||
if COB_INTEGRATION_AVAILABLE and COBIntegration is not None:
|
if COB_INTEGRATION_AVAILABLE and COBIntegration is not None:
|
||||||
try:
|
try:
|
||||||
self.cob_integration = COBIntegration(
|
self.cob_integration = COBIntegration(
|
||||||
symbols=self.symbols,
|
symbols=[self.symbol] + self.ref_symbols, # Primary + reference symbols
|
||||||
data_provider=self.data_provider
|
data_provider=self.data_provider
|
||||||
)
|
)
|
||||||
logger.info("COB Integration initialized")
|
logger.info("COB Integration initialized")
|
||||||
@ -1498,12 +1498,22 @@ class TradingOrchestrator:
|
|||||||
async def _trigger_model_training(self, symbol: str):
|
async def _trigger_model_training(self, symbol: str):
|
||||||
"""Trigger training for models based on previous inference data"""
|
"""Trigger training for models based on previous inference data"""
|
||||||
try:
|
try:
|
||||||
if not self.training_enabled or symbol not in self.inference_history:
|
if not self.training_enabled:
|
||||||
|
logger.debug("Training disabled, skipping model training")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Get recent inference records
|
# Check if we have any inference history for any model
|
||||||
recent_records = list(self.inference_history[symbol])
|
if not self.inference_history:
|
||||||
if len(recent_records) < 2:
|
logger.debug("No inference history available for training")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get recent inference records from all models (not symbol-based)
|
||||||
|
all_recent_records = []
|
||||||
|
for model_name, model_records in self.inference_history.items():
|
||||||
|
all_recent_records.extend(list(model_records))
|
||||||
|
|
||||||
|
if len(all_recent_records) < 2:
|
||||||
|
logger.debug("Not enough inference records for training")
|
||||||
return # Need at least 2 records to compare
|
return # Need at least 2 records to compare
|
||||||
|
|
||||||
# Get current price for outcome evaluation
|
# Get current price for outcome evaluation
|
||||||
|
Reference in New Issue
Block a user