416 lines
19 KiB
Python
416 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Streamlined Trading System - Web Dashboard + Training
|
|
|
|
Integrated system with both training loop and web dashboard:
|
|
- Training Pipeline: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution
|
|
- Web Dashboard: Real-time monitoring and control interface
|
|
- 2-Action System: BUY/SELL with intelligent position management
|
|
- Always invested approach with smart risk/reward setup detection
|
|
|
|
Usage:
|
|
python main.py [--symbol ETH/USDT] [--port 8050]
|
|
"""
|
|
|
|
import os
|
|
# Fix OpenMP library conflicts before importing other modules
|
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
|
|
os.environ['OMP_NUM_THREADS'] = '4'
|
|
|
|
import asyncio
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from threading import Thread
|
|
import time
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from core.config import get_config, setup_logging, Config
|
|
from core.data_provider import DataProvider
|
|
|
|
# Import checkpoint management
|
|
from utils.checkpoint_manager import get_checkpoint_manager
|
|
from utils.training_integration import get_training_integration
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def run_web_dashboard():
|
|
"""Run the streamlined web dashboard with 2-action system and always-invested approach"""
|
|
try:
|
|
logger.info("Starting Streamlined Trading Dashboard...")
|
|
logger.info("2-Action System: BUY/SELL with intelligent position management")
|
|
logger.info("Always Invested Approach: Smart risk/reward setup detection")
|
|
logger.info("Integrated Training Pipeline: Live data -> Models -> Trading")
|
|
|
|
# Get configuration
|
|
config = get_config()
|
|
|
|
# Initialize core components for streamlined pipeline
|
|
from core.data_provider import DataProvider
|
|
from core.enhanced_orchestrator import EnhancedTradingOrchestrator
|
|
from core.trading_executor import TradingExecutor
|
|
|
|
# Create data provider
|
|
data_provider = DataProvider()
|
|
|
|
# Start real-time streaming for BOM caching
|
|
try:
|
|
await data_provider.start_real_time_streaming()
|
|
logger.info("[SUCCESS] Real-time data streaming started for BOM caching")
|
|
except Exception as e:
|
|
logger.warning(f"[WARNING] Real-time streaming failed: {e}")
|
|
|
|
# Verify data connection
|
|
logger.info("[DATA] Verifying live data connection...")
|
|
symbol = config.get('symbols', ['ETH/USDT'])[0]
|
|
test_df = data_provider.get_historical_data(symbol, '1m', limit=10)
|
|
if test_df is not None and len(test_df) > 0:
|
|
logger.info("[SUCCESS] Data connection verified")
|
|
logger.info(f"[SUCCESS] Fetched {len(test_df)} candles for validation")
|
|
else:
|
|
logger.error("[ERROR] Data connection failed - no live data available")
|
|
return
|
|
|
|
# Load model registry for integrated pipeline
|
|
try:
|
|
from models import get_model_registry
|
|
model_registry = {} # Use simple dict for now
|
|
logger.info("[MODELS] Model registry initialized for training")
|
|
except ImportError:
|
|
model_registry = {}
|
|
logger.warning("Model registry not available, using empty registry")
|
|
|
|
# Initialize checkpoint management
|
|
checkpoint_manager = get_checkpoint_manager()
|
|
training_integration = get_training_integration()
|
|
logger.info("Checkpoint management initialized for training pipeline")
|
|
|
|
# Create streamlined orchestrator with 2-action system and always-invested approach
|
|
orchestrator = EnhancedTradingOrchestrator(
|
|
data_provider=data_provider,
|
|
symbols=config.get('symbols', ['ETH/USDT']),
|
|
enhanced_rl_training=True,
|
|
model_registry=model_registry
|
|
)
|
|
logger.info("Enhanced Trading Orchestrator with 2-Action System initialized")
|
|
logger.info("Always Invested: Learning to spot high risk/reward setups")
|
|
|
|
# Checkpoint management will be handled in the training loop
|
|
logger.info("Checkpoint management will be initialized in training loop")
|
|
|
|
# Start COB integration for real-time market microstructure
|
|
try:
|
|
# Create and start COB integration task
|
|
cob_task = asyncio.create_task(orchestrator.start_cob_integration())
|
|
logger.info("COB Integration startup task created")
|
|
except Exception as e:
|
|
logger.warning(f"COB Integration startup failed (will retry): {e}")
|
|
|
|
# Create trading executor for live execution
|
|
trading_executor = TradingExecutor()
|
|
|
|
# Start the training and monitoring loop
|
|
logger.info(f"Starting Enhanced Training Pipeline")
|
|
logger.info("Live Data Processing: ENABLED")
|
|
logger.info("COB Integration: ENABLED (Real-time market microstructure)")
|
|
logger.info("Integrated CNN Training: ENABLED")
|
|
logger.info("Integrated RL Training: ENABLED")
|
|
logger.info("Real-time Indicators & Pivots: ENABLED")
|
|
logger.info("Live Trading Execution: ENABLED")
|
|
logger.info("2-Action System: BUY/SELL with position intelligence")
|
|
logger.info("Always Invested: Different thresholds for entry/exit")
|
|
logger.info("Pipeline: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution")
|
|
logger.info("Starting training loop...")
|
|
|
|
# Start the training loop
|
|
await start_training_loop(orchestrator, trading_executor)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in streamlined dashboard: {e}")
|
|
logger.error("Training stopped")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
def start_web_ui(port=8051):
|
|
"""Start the main TradingDashboard UI in a separate thread"""
|
|
try:
|
|
logger.info("=" * 50)
|
|
logger.info("Starting Main Trading Dashboard UI...")
|
|
logger.info(f"Trading Dashboard: http://127.0.0.1:{port}")
|
|
logger.info("COB Integration: ENABLED (Real-time order book visualization)")
|
|
logger.info("=" * 50)
|
|
|
|
# Import and create the Clean Trading Dashboard with COB integration
|
|
from web.clean_dashboard import CleanTradingDashboard
|
|
from core.data_provider import DataProvider
|
|
from core.enhanced_orchestrator import EnhancedTradingOrchestrator # Use enhanced version with COB
|
|
from core.trading_executor import TradingExecutor
|
|
|
|
# Initialize components for the dashboard
|
|
config = get_config()
|
|
data_provider = DataProvider()
|
|
|
|
# Start real-time streaming for BOM caching (non-blocking)
|
|
try:
|
|
import threading
|
|
def start_streaming():
|
|
import asyncio
|
|
asyncio.run(data_provider.start_real_time_streaming())
|
|
|
|
streaming_thread = threading.Thread(target=start_streaming, daemon=True)
|
|
streaming_thread.start()
|
|
logger.info("[SUCCESS] Real-time streaming thread started for dashboard")
|
|
except Exception as e:
|
|
logger.warning(f"[WARNING] Dashboard streaming setup failed: {e}")
|
|
|
|
# Load model registry for enhanced features
|
|
try:
|
|
from models import get_model_registry
|
|
model_registry = {} # Use simple dict for now
|
|
except ImportError:
|
|
model_registry = {}
|
|
|
|
# Initialize checkpoint management for dashboard
|
|
dashboard_checkpoint_manager = get_checkpoint_manager()
|
|
dashboard_training_integration = get_training_integration()
|
|
|
|
# Create enhanced orchestrator for the dashboard (WITH COB integration)
|
|
dashboard_orchestrator = EnhancedTradingOrchestrator(
|
|
data_provider=data_provider,
|
|
symbols=config.get('symbols', ['ETH/USDT']),
|
|
enhanced_rl_training=True, # Enable RL training display
|
|
model_registry=model_registry
|
|
)
|
|
|
|
trading_executor = TradingExecutor("config.yaml")
|
|
|
|
# Create the clean trading dashboard with enhanced features
|
|
dashboard = CleanTradingDashboard(
|
|
data_provider=data_provider,
|
|
orchestrator=dashboard_orchestrator,
|
|
trading_executor=trading_executor
|
|
)
|
|
|
|
logger.info("Clean Trading Dashboard created successfully")
|
|
logger.info("Features: Live trading, COB visualization, RL training monitoring, Position management")
|
|
logger.info("✅ Checkpoint management integrated for training persistence")
|
|
|
|
# Run the dashboard server (COB integration will start automatically)
|
|
dashboard.run_server(host='127.0.0.1', port=port, debug=False)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting main trading dashboard UI: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
async def start_training_loop(orchestrator, trading_executor):
|
|
"""Start the main training and monitoring loop with checkpoint management"""
|
|
logger.info("=" * 70)
|
|
logger.info("STARTING ENHANCED TRAINING LOOP WITH COB INTEGRATION")
|
|
logger.info("=" * 70)
|
|
|
|
# Initialize checkpoint management for training loop
|
|
checkpoint_manager = get_checkpoint_manager()
|
|
training_integration = get_training_integration()
|
|
|
|
# Training statistics for checkpoint management
|
|
training_stats = {
|
|
'iteration_count': 0,
|
|
'total_decisions': 0,
|
|
'successful_trades': 0,
|
|
'best_performance': 0.0,
|
|
'last_checkpoint_iteration': 0
|
|
}
|
|
|
|
try:
|
|
# Start real-time processing
|
|
await orchestrator.start_realtime_processing()
|
|
|
|
# Main training loop
|
|
iteration = 0
|
|
while True:
|
|
iteration += 1
|
|
training_stats['iteration_count'] = iteration
|
|
|
|
logger.info(f"Training iteration {iteration}")
|
|
|
|
# Make coordinated decisions (this triggers CNN and RL training)
|
|
decisions = await orchestrator.make_coordinated_decisions()
|
|
|
|
# Process decisions and collect training metrics
|
|
iteration_decisions = 0
|
|
iteration_performance = 0.0
|
|
|
|
# Log decisions and performance
|
|
for symbol, decision in decisions.items():
|
|
if decision:
|
|
iteration_decisions += 1
|
|
logger.info(f"{symbol}: {decision.action} (confidence: {decision.confidence:.3f})")
|
|
|
|
# Track performance for checkpoint management
|
|
iteration_performance += decision.confidence
|
|
|
|
# Execute if confidence is high enough
|
|
if decision.confidence > 0.7:
|
|
logger.info(f"Executing {symbol}: {decision.action}")
|
|
training_stats['successful_trades'] += 1
|
|
# trading_executor.execute_action(decision)
|
|
|
|
# Update training statistics
|
|
training_stats['total_decisions'] += iteration_decisions
|
|
if iteration_performance > training_stats['best_performance']:
|
|
training_stats['best_performance'] = iteration_performance
|
|
|
|
# Save checkpoint every 50 iterations or when performance improves significantly
|
|
should_save_checkpoint = (
|
|
iteration % 50 == 0 or # Regular interval
|
|
iteration_performance > training_stats['best_performance'] * 1.1 or # 10% improvement
|
|
iteration - training_stats['last_checkpoint_iteration'] >= 100 # Force save every 100 iterations
|
|
)
|
|
|
|
if should_save_checkpoint:
|
|
try:
|
|
# Create performance metrics for checkpoint
|
|
performance_metrics = {
|
|
'avg_confidence': iteration_performance / max(iteration_decisions, 1),
|
|
'success_rate': training_stats['successful_trades'] / max(training_stats['total_decisions'], 1),
|
|
'total_decisions': training_stats['total_decisions'],
|
|
'iteration': iteration
|
|
}
|
|
|
|
# Save orchestrator state (if it has models)
|
|
if hasattr(orchestrator, 'rl_agent') and orchestrator.rl_agent:
|
|
saved = orchestrator.rl_agent.save_checkpoint(iteration_performance)
|
|
if saved:
|
|
logger.info(f"✅ RL Agent checkpoint saved at iteration {iteration}")
|
|
|
|
if hasattr(orchestrator, 'cnn_model') and orchestrator.cnn_model:
|
|
# Simulate CNN checkpoint save
|
|
logger.info(f"✅ CNN Model training state saved at iteration {iteration}")
|
|
|
|
if hasattr(orchestrator, 'extrema_trainer') and orchestrator.extrema_trainer:
|
|
saved = orchestrator.extrema_trainer.save_checkpoint()
|
|
if saved:
|
|
logger.info(f"✅ ExtremaTrainer checkpoint saved at iteration {iteration}")
|
|
|
|
training_stats['last_checkpoint_iteration'] = iteration
|
|
logger.info(f"📊 Checkpoint management completed for iteration {iteration}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Checkpoint saving failed at iteration {iteration}: {e}")
|
|
|
|
# Log performance metrics every 10 iterations
|
|
if iteration % 10 == 0:
|
|
metrics = orchestrator.get_performance_metrics()
|
|
logger.info(f"Performance metrics: {metrics}")
|
|
|
|
# Log training statistics
|
|
logger.info(f"Training stats: {training_stats}")
|
|
|
|
# Log checkpoint statistics
|
|
checkpoint_stats = checkpoint_manager.get_checkpoint_stats()
|
|
logger.info(f"Checkpoints: {checkpoint_stats['total_checkpoints']} total, "
|
|
f"{checkpoint_stats['total_size_mb']:.2f} MB")
|
|
|
|
# Log COB integration status
|
|
for symbol in orchestrator.symbols:
|
|
cob_features = orchestrator.latest_cob_features.get(symbol)
|
|
cob_state = orchestrator.latest_cob_state.get(symbol)
|
|
if cob_features is not None:
|
|
logger.info(f"{symbol} COB: CNN features {cob_features.shape}, DQN state {cob_state.shape if cob_state is not None else 'None'}")
|
|
|
|
# Sleep between iterations
|
|
await asyncio.sleep(5) # 5 second intervals
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Training interrupted by user")
|
|
except Exception as e:
|
|
logger.error(f"Error in training loop: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
# Save final checkpoints before shutdown
|
|
try:
|
|
logger.info("Saving final checkpoints before shutdown...")
|
|
|
|
if hasattr(orchestrator, 'rl_agent') and orchestrator.rl_agent:
|
|
orchestrator.rl_agent.save_checkpoint(0.0, force_save=True)
|
|
logger.info("✅ Final RL Agent checkpoint saved")
|
|
|
|
if hasattr(orchestrator, 'extrema_trainer') and orchestrator.extrema_trainer:
|
|
orchestrator.extrema_trainer.save_checkpoint(force_save=True)
|
|
logger.info("✅ Final ExtremaTrainer checkpoint saved")
|
|
|
|
# Log final checkpoint statistics
|
|
final_stats = checkpoint_manager.get_checkpoint_stats()
|
|
logger.info(f"📊 Final checkpoint stats: {final_stats['total_checkpoints']} checkpoints, "
|
|
f"{final_stats['total_size_mb']:.2f} MB total")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error saving final checkpoints: {e}")
|
|
|
|
await orchestrator.stop_realtime_processing()
|
|
await orchestrator.stop_cob_integration()
|
|
logger.info("Training loop stopped with checkpoint management")
|
|
|
|
async def main():
|
|
"""Main entry point with both training loop and web dashboard"""
|
|
parser = argparse.ArgumentParser(description='Streamlined Trading System - Training + Web Dashboard')
|
|
parser.add_argument('--symbol', type=str, default='ETH/USDT',
|
|
help='Primary trading symbol (default: ETH/USDT)')
|
|
parser.add_argument('--port', type=int, default=8050,
|
|
help='Web dashboard port (default: 8050)')
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable debug mode')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging and ensure directories exist
|
|
Path("logs").mkdir(exist_ok=True)
|
|
Path("NN/models/saved").mkdir(parents=True, exist_ok=True)
|
|
setup_logging()
|
|
|
|
try:
|
|
logger.info("=" * 70)
|
|
logger.info("STREAMLINED TRADING SYSTEM - TRAINING + MAIN DASHBOARD")
|
|
logger.info(f"Primary Symbol: {args.symbol}")
|
|
logger.info(f"Training Port: {args.port}")
|
|
logger.info(f"Main Trading Dashboard: http://127.0.0.1:{args.port}")
|
|
logger.info("2-Action System: BUY/SELL with intelligent position management")
|
|
logger.info("Always Invested: Learning to spot high risk/reward setups")
|
|
logger.info("Flow: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution")
|
|
logger.info("Main Dashboard: Live trading, RL monitoring, Position management")
|
|
logger.info("🔄 Checkpoint Management: Automatic training state persistence")
|
|
# logger.info("📊 W&B Integration: Optional experiment tracking")
|
|
logger.info("💾 Model Rotation: Keep best 5 checkpoints per model")
|
|
logger.info("=" * 70)
|
|
|
|
# Start main trading dashboard UI in a separate thread
|
|
web_thread = Thread(target=lambda: start_web_ui(args.port), daemon=True)
|
|
web_thread.start()
|
|
logger.info("Main trading dashboard UI thread started")
|
|
|
|
# Give web UI time to start
|
|
await asyncio.sleep(2)
|
|
|
|
# Run the training loop (this will run indefinitely)
|
|
await run_web_dashboard()
|
|
|
|
logger.info("[SUCCESS] Operation completed successfully!")
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("System shutdown requested by user")
|
|
except Exception as e:
|
|
logger.error(f"Fatal error: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return 1
|
|
|
|
return 0
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(asyncio.run(main())) |