384 lines
15 KiB
Python
384 lines
15 KiB
Python
"""
|
|
Test MEXC Trading Integration
|
|
|
|
This script tests the integration between the enhanced orchestrator and MEXC trading executor.
|
|
It verifies that trading signals can be executed through the MEXC API with proper risk management.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
# Add core directory to path
|
|
sys.path.append(os.path.join(os.path.dirname(__file__), 'core'))
|
|
|
|
from core.trading_executor import TradingExecutor, Position, TradeRecord
|
|
from core.enhanced_orchestrator import EnhancedTradingOrchestrator
|
|
from core.data_provider import DataProvider
|
|
from core.config import get_config
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler("test_mexc_trading.log"),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger("mexc_trading_test")
|
|
|
|
class TradingIntegrationTest:
|
|
"""Test class for MEXC trading integration"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the test environment"""
|
|
self.config = get_config()
|
|
self.data_provider = DataProvider()
|
|
self.orchestrator = EnhancedTradingOrchestrator(self.data_provider)
|
|
self.trading_executor = TradingExecutor()
|
|
|
|
# Test configuration
|
|
self.test_symbol = 'ETH/USDT'
|
|
self.test_confidence = 0.75
|
|
|
|
def test_trading_executor_initialization(self):
|
|
"""Test that the trading executor initializes correctly"""
|
|
logger.info("Testing trading executor initialization...")
|
|
|
|
try:
|
|
# Check configuration
|
|
assert self.trading_executor.mexc_config is not None
|
|
logger.info("✅ MEXC configuration loaded")
|
|
|
|
# Check dry run mode
|
|
assert self.trading_executor.dry_run == True
|
|
logger.info("✅ Dry run mode enabled for safety")
|
|
|
|
# Check position limits
|
|
max_position_value = self.trading_executor.mexc_config.get('max_position_value_usd', 1.0)
|
|
assert max_position_value == 1.0
|
|
logger.info(f"✅ Max position value set to ${max_position_value}")
|
|
|
|
# Check safety features
|
|
assert self.trading_executor.mexc_config.get('emergency_stop', False) == False
|
|
logger.info("✅ Emergency stop not active")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Trading executor initialization test failed: {e}")
|
|
return False
|
|
|
|
def test_exchange_connection(self):
|
|
"""Test connection to MEXC exchange"""
|
|
logger.info("Testing MEXC exchange connection...")
|
|
|
|
try:
|
|
# Test ticker retrieval
|
|
ticker = self.trading_executor.exchange.get_ticker(self.test_symbol)
|
|
|
|
if ticker:
|
|
logger.info(f"✅ Successfully retrieved ticker for {self.test_symbol}")
|
|
logger.info(f" Current price: ${ticker['last']:.2f}")
|
|
logger.info(f" Bid: ${ticker['bid']:.2f}, Ask: ${ticker['ask']:.2f}")
|
|
return True
|
|
else:
|
|
logger.error(f"❌ Failed to retrieve ticker for {self.test_symbol}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Exchange connection test failed: {e}")
|
|
return False
|
|
|
|
def test_position_size_calculation(self):
|
|
"""Test position size calculation with different confidence levels"""
|
|
logger.info("Testing position size calculation...")
|
|
|
|
try:
|
|
test_price = 2500.0
|
|
test_cases = [
|
|
(0.5, "Medium confidence"),
|
|
(0.75, "High confidence"),
|
|
(0.9, "Very high confidence"),
|
|
(0.3, "Low confidence")
|
|
]
|
|
|
|
for confidence, description in test_cases:
|
|
position_value = self.trading_executor._calculate_position_size(confidence, test_price)
|
|
quantity = position_value / test_price
|
|
|
|
logger.info(f" {description} ({confidence:.1f}): ${position_value:.2f} = {quantity:.6f} ETH")
|
|
|
|
# Verify position value is within limits
|
|
max_value = self.trading_executor.mexc_config.get('max_position_value_usd', 1.0)
|
|
min_value = self.trading_executor.mexc_config.get('min_position_value_usd', 0.1)
|
|
|
|
assert min_value <= position_value <= max_value
|
|
|
|
logger.info("✅ Position size calculation working correctly")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Position size calculation test failed: {e}")
|
|
return False
|
|
|
|
def test_dry_run_trading(self):
|
|
"""Test dry run trading execution"""
|
|
logger.info("Testing dry run trading execution...")
|
|
|
|
try:
|
|
# Get current price
|
|
ticker = self.trading_executor.exchange.get_ticker(self.test_symbol)
|
|
if not ticker:
|
|
logger.error("Cannot get current price for testing")
|
|
return False
|
|
|
|
current_price = ticker['last']
|
|
|
|
# Test BUY signal
|
|
logger.info(f"Testing BUY signal for {self.test_symbol} at ${current_price:.2f}")
|
|
buy_success = self.trading_executor.execute_signal(
|
|
symbol=self.test_symbol,
|
|
action='BUY',
|
|
confidence=self.test_confidence,
|
|
current_price=current_price
|
|
)
|
|
|
|
if buy_success:
|
|
logger.info("✅ BUY signal executed successfully in dry run mode")
|
|
|
|
# Check position was created
|
|
positions = self.trading_executor.get_positions()
|
|
assert self.test_symbol in positions
|
|
position = positions[self.test_symbol]
|
|
logger.info(f" Position created: {position.side} {position.quantity:.6f} @ ${position.entry_price:.2f}")
|
|
else:
|
|
logger.error("❌ BUY signal execution failed")
|
|
return False
|
|
|
|
# Wait a moment
|
|
time.sleep(1)
|
|
|
|
# Test SELL signal
|
|
logger.info(f"Testing SELL signal for {self.test_symbol}")
|
|
sell_success = self.trading_executor.execute_signal(
|
|
symbol=self.test_symbol,
|
|
action='SELL',
|
|
confidence=self.test_confidence,
|
|
current_price=current_price * 1.01 # Simulate 1% price increase
|
|
)
|
|
|
|
if sell_success:
|
|
logger.info("✅ SELL signal executed successfully in dry run mode")
|
|
|
|
# Check position was closed
|
|
positions = self.trading_executor.get_positions()
|
|
assert self.test_symbol not in positions
|
|
|
|
# Check trade history
|
|
trade_history = self.trading_executor.get_trade_history()
|
|
assert len(trade_history) > 0
|
|
|
|
last_trade = trade_history[-1]
|
|
logger.info(f" Trade completed: P&L ${last_trade.pnl:.2f}")
|
|
else:
|
|
logger.error("❌ SELL signal execution failed")
|
|
return False
|
|
|
|
logger.info("✅ Dry run trading test completed successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Dry run trading test failed: {e}")
|
|
return False
|
|
|
|
def test_safety_conditions(self):
|
|
"""Test safety condition checks"""
|
|
logger.info("Testing safety condition checks...")
|
|
|
|
try:
|
|
# Test symbol allowlist
|
|
disallowed_symbol = 'DOGE/USDT'
|
|
result = self.trading_executor._check_safety_conditions(disallowed_symbol, 'BUY')
|
|
if disallowed_symbol not in self.trading_executor.mexc_config.get('allowed_symbols', []):
|
|
assert result == False
|
|
logger.info("✅ Symbol allowlist check working")
|
|
|
|
# Test trade interval
|
|
# First trade should succeed
|
|
current_price = 2500.0
|
|
self.trading_executor.execute_signal(self.test_symbol, 'BUY', 0.7, current_price)
|
|
|
|
# Immediate second trade should fail due to interval
|
|
result = self.trading_executor._check_safety_conditions(self.test_symbol, 'BUY')
|
|
# Note: This might pass if interval is very short, which is fine for testing
|
|
|
|
logger.info("✅ Safety condition checks working")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Safety condition test failed: {e}")
|
|
return False
|
|
|
|
def test_daily_statistics(self):
|
|
"""Test daily statistics tracking"""
|
|
logger.info("Testing daily statistics tracking...")
|
|
|
|
try:
|
|
stats = self.trading_executor.get_daily_stats()
|
|
|
|
required_keys = ['daily_trades', 'daily_loss', 'total_pnl', 'winning_trades',
|
|
'losing_trades', 'win_rate', 'positions_count']
|
|
|
|
for key in required_keys:
|
|
assert key in stats
|
|
|
|
logger.info("✅ Daily statistics structure correct")
|
|
logger.info(f" Daily trades: {stats['daily_trades']}")
|
|
logger.info(f" Total P&L: ${stats['total_pnl']:.2f}")
|
|
logger.info(f" Win rate: {stats['win_rate']:.1%}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Daily statistics test failed: {e}")
|
|
return False
|
|
|
|
async def test_orchestrator_integration(self):
|
|
"""Test integration with enhanced orchestrator"""
|
|
logger.info("Testing orchestrator integration...")
|
|
|
|
try:
|
|
# Test that orchestrator can make decisions
|
|
decisions = await self.orchestrator.make_coordinated_decisions()
|
|
|
|
logger.info(f"✅ Orchestrator made decisions for {len(decisions)} symbols")
|
|
|
|
for symbol, decision in decisions.items():
|
|
if decision:
|
|
logger.info(f" {symbol}: {decision.action} (confidence: {decision.confidence:.3f})")
|
|
|
|
# Test executing the decision through trading executor
|
|
if decision.action != 'HOLD':
|
|
success = self.trading_executor.execute_signal(
|
|
symbol=symbol,
|
|
action=decision.action,
|
|
confidence=decision.confidence,
|
|
current_price=decision.price
|
|
)
|
|
|
|
if success:
|
|
logger.info(f" ✅ Successfully executed {decision.action} for {symbol}")
|
|
else:
|
|
logger.info(f" ⚠️ Trade execution blocked by safety conditions for {symbol}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Orchestrator integration test failed: {e}")
|
|
return False
|
|
|
|
def test_emergency_stop(self):
|
|
"""Test emergency stop functionality"""
|
|
logger.info("Testing emergency stop functionality...")
|
|
|
|
try:
|
|
# Create a test position first
|
|
current_price = 2500.0
|
|
self.trading_executor.execute_signal(self.test_symbol, 'BUY', 0.7, current_price)
|
|
|
|
# Verify position exists
|
|
positions_before = self.trading_executor.get_positions()
|
|
logger.info(f" Positions before emergency stop: {len(positions_before)}")
|
|
|
|
# Trigger emergency stop
|
|
self.trading_executor.emergency_stop()
|
|
|
|
# Verify trading is disabled
|
|
assert self.trading_executor.trading_enabled == False
|
|
logger.info("✅ Trading disabled after emergency stop")
|
|
|
|
# In dry run mode, positions should still be closed
|
|
positions_after = self.trading_executor.get_positions()
|
|
logger.info(f" Positions after emergency stop: {len(positions_after)}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Emergency stop test failed: {e}")
|
|
return False
|
|
|
|
async def run_all_tests(self):
|
|
"""Run all integration tests"""
|
|
logger.info("🚀 Starting MEXC Trading Integration Tests")
|
|
logger.info("=" * 60)
|
|
|
|
tests = [
|
|
("Trading Executor Initialization", self.test_trading_executor_initialization),
|
|
("Exchange Connection", self.test_exchange_connection),
|
|
("Position Size Calculation", self.test_position_size_calculation),
|
|
("Dry Run Trading", self.test_dry_run_trading),
|
|
("Safety Conditions", self.test_safety_conditions),
|
|
("Daily Statistics", self.test_daily_statistics),
|
|
("Orchestrator Integration", self.test_orchestrator_integration),
|
|
("Emergency Stop", self.test_emergency_stop),
|
|
]
|
|
|
|
passed = 0
|
|
failed = 0
|
|
|
|
for test_name, test_func in tests:
|
|
logger.info(f"\n📋 Running test: {test_name}")
|
|
logger.info("-" * 40)
|
|
|
|
try:
|
|
if asyncio.iscoroutinefunction(test_func):
|
|
result = await test_func()
|
|
else:
|
|
result = test_func()
|
|
|
|
if result:
|
|
passed += 1
|
|
logger.info(f"✅ {test_name} PASSED")
|
|
else:
|
|
failed += 1
|
|
logger.error(f"❌ {test_name} FAILED")
|
|
|
|
except Exception as e:
|
|
failed += 1
|
|
logger.error(f"❌ {test_name} FAILED with exception: {e}")
|
|
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("🏁 Test Results Summary")
|
|
logger.info(f"✅ Passed: {passed}")
|
|
logger.info(f"❌ Failed: {failed}")
|
|
logger.info(f"📊 Success Rate: {passed/(passed+failed)*100:.1f}%")
|
|
|
|
if failed == 0:
|
|
logger.info("🎉 All tests passed! MEXC trading integration is ready.")
|
|
else:
|
|
logger.warning(f"⚠️ {failed} test(s) failed. Please review and fix issues before live trading.")
|
|
|
|
return failed == 0
|
|
|
|
async def main():
|
|
"""Main test function"""
|
|
test_runner = TradingIntegrationTest()
|
|
success = await test_runner.run_all_tests()
|
|
|
|
if success:
|
|
logger.info("\n🔧 Next Steps:")
|
|
logger.info("1. Set up your MEXC API keys in .env file")
|
|
logger.info("2. Update config.yaml to enable trading (mexc_trading.enabled: true)")
|
|
logger.info("3. Consider disabling dry_run_mode for live trading")
|
|
logger.info("4. Start with small position sizes for initial live testing")
|
|
logger.info("5. Monitor the system closely during initial live trading")
|
|
|
|
return success
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |