limit max positions
This commit is contained in:
306
position_sync_enhancement.py
Normal file
306
position_sync_enhancement.py
Normal file
@ -0,0 +1,306 @@
|
||||
"""
|
||||
Enhanced Position Synchronization System
|
||||
Addresses the gap between dashboard position display and actual exchange account state
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class EnhancedPositionSync:
|
||||
"""Enhanced position synchronization to ensure dashboard matches actual exchange state"""
|
||||
|
||||
def __init__(self, trading_executor, dashboard):
|
||||
self.trading_executor = trading_executor
|
||||
self.dashboard = dashboard
|
||||
self.last_sync_time = 0
|
||||
self.sync_interval = 10 # Sync every 10 seconds
|
||||
self.position_history = [] # Track position changes
|
||||
|
||||
def sync_all_positions(self) -> Dict[str, Any]:
|
||||
"""Comprehensive position sync for all symbols"""
|
||||
try:
|
||||
sync_results = {}
|
||||
|
||||
# 1. Get actual exchange positions
|
||||
exchange_positions = self._get_actual_exchange_positions()
|
||||
|
||||
# 2. Get dashboard positions
|
||||
dashboard_positions = self._get_dashboard_positions()
|
||||
|
||||
# 3. Compare and sync
|
||||
for symbol in ['ETH/USDT', 'BTC/USDT']:
|
||||
sync_result = self._sync_symbol_position(
|
||||
symbol,
|
||||
exchange_positions.get(symbol),
|
||||
dashboard_positions.get(symbol)
|
||||
)
|
||||
sync_results[symbol] = sync_result
|
||||
|
||||
# 4. Update closed trades list from exchange
|
||||
self._sync_closed_trades()
|
||||
|
||||
return {
|
||||
'sync_time': datetime.now().isoformat(),
|
||||
'results': sync_results,
|
||||
'total_synced': len(sync_results),
|
||||
'issues_found': sum(1 for r in sync_results.values() if not r['in_sync'])
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in comprehensive position sync: {e}")
|
||||
return {'error': str(e)}
|
||||
|
||||
def _get_actual_exchange_positions(self) -> Dict[str, Dict]:
|
||||
"""Get actual positions from exchange account"""
|
||||
try:
|
||||
positions = {}
|
||||
|
||||
if not self.trading_executor:
|
||||
return positions
|
||||
|
||||
# Get account balances
|
||||
if hasattr(self.trading_executor, 'get_account_balance'):
|
||||
balances = self.trading_executor.get_account_balance()
|
||||
|
||||
for symbol in ['ETH/USDT', 'BTC/USDT']:
|
||||
# Parse symbol to get base asset
|
||||
base_asset = symbol.split('/')[0]
|
||||
|
||||
# Get balance for base asset
|
||||
base_balance = balances.get(base_asset, {}).get('total', 0.0)
|
||||
|
||||
if base_balance > 0.001: # Minimum threshold
|
||||
positions[symbol] = {
|
||||
'side': 'LONG',
|
||||
'size': base_balance,
|
||||
'value': base_balance * self._get_current_price(symbol),
|
||||
'source': 'exchange_balance'
|
||||
}
|
||||
|
||||
# Also check trading executor's position tracking
|
||||
if hasattr(self.trading_executor, 'get_positions'):
|
||||
executor_positions = self.trading_executor.get_positions()
|
||||
for symbol, position in executor_positions.items():
|
||||
if position and hasattr(position, 'quantity') and position.quantity > 0:
|
||||
positions[symbol] = {
|
||||
'side': position.side,
|
||||
'size': position.quantity,
|
||||
'entry_price': position.entry_price,
|
||||
'value': position.quantity * self._get_current_price(symbol),
|
||||
'source': 'executor_tracking'
|
||||
}
|
||||
|
||||
return positions
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting actual exchange positions: {e}")
|
||||
return {}
|
||||
|
||||
def _get_dashboard_positions(self) -> Dict[str, Dict]:
|
||||
"""Get positions as shown on dashboard"""
|
||||
try:
|
||||
positions = {}
|
||||
|
||||
# Get from dashboard's current_position
|
||||
if self.dashboard.current_position:
|
||||
symbol = self.dashboard.current_position.get('symbol', 'ETH/USDT')
|
||||
positions[symbol] = {
|
||||
'side': self.dashboard.current_position.get('side'),
|
||||
'size': self.dashboard.current_position.get('size'),
|
||||
'entry_price': self.dashboard.current_position.get('price'),
|
||||
'value': self.dashboard.current_position.get('size', 0) * self._get_current_price(symbol),
|
||||
'source': 'dashboard_display'
|
||||
}
|
||||
|
||||
return positions
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting dashboard positions: {e}")
|
||||
return {}
|
||||
|
||||
def _sync_symbol_position(self, symbol: str, exchange_pos: Optional[Dict], dashboard_pos: Optional[Dict]) -> Dict[str, Any]:
|
||||
"""Sync position for a specific symbol"""
|
||||
try:
|
||||
sync_result = {
|
||||
'symbol': symbol,
|
||||
'exchange_position': exchange_pos,
|
||||
'dashboard_position': dashboard_pos,
|
||||
'in_sync': True,
|
||||
'action_taken': 'none'
|
||||
}
|
||||
|
||||
# Case 1: Exchange has position, dashboard doesn't
|
||||
if exchange_pos and not dashboard_pos:
|
||||
logger.warning(f"SYNC ISSUE: Exchange has {symbol} position but dashboard shows none")
|
||||
|
||||
# Update dashboard to reflect exchange position
|
||||
self.dashboard.current_position = {
|
||||
'symbol': symbol,
|
||||
'side': exchange_pos['side'],
|
||||
'size': exchange_pos['size'],
|
||||
'price': exchange_pos.get('entry_price', self._get_current_price(symbol)),
|
||||
'entry_time': datetime.now(),
|
||||
'leverage': self.dashboard.current_leverage,
|
||||
'source': 'sync_correction'
|
||||
}
|
||||
|
||||
sync_result['in_sync'] = False
|
||||
sync_result['action_taken'] = 'updated_dashboard_from_exchange'
|
||||
|
||||
# Case 2: Dashboard has position, exchange doesn't
|
||||
elif dashboard_pos and not exchange_pos:
|
||||
logger.warning(f"SYNC ISSUE: Dashboard shows {symbol} position but exchange has none")
|
||||
|
||||
# Clear dashboard position
|
||||
self.dashboard.current_position = None
|
||||
|
||||
sync_result['in_sync'] = False
|
||||
sync_result['action_taken'] = 'cleared_dashboard_position'
|
||||
|
||||
# Case 3: Both have positions but they differ
|
||||
elif exchange_pos and dashboard_pos:
|
||||
if (exchange_pos['side'] != dashboard_pos['side'] or
|
||||
abs(exchange_pos['size'] - dashboard_pos['size']) > 0.001):
|
||||
|
||||
logger.warning(f"SYNC ISSUE: {symbol} position mismatch - Exchange: {exchange_pos['side']} {exchange_pos['size']:.3f}, Dashboard: {dashboard_pos['side']} {dashboard_pos['size']:.3f}")
|
||||
|
||||
# Update dashboard to match exchange
|
||||
self.dashboard.current_position.update({
|
||||
'side': exchange_pos['side'],
|
||||
'size': exchange_pos['size'],
|
||||
'price': exchange_pos.get('entry_price', dashboard_pos['entry_price'])
|
||||
})
|
||||
|
||||
sync_result['in_sync'] = False
|
||||
sync_result['action_taken'] = 'updated_dashboard_to_match_exchange'
|
||||
|
||||
return sync_result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing position for {symbol}: {e}")
|
||||
return {'symbol': symbol, 'error': str(e), 'in_sync': False}
|
||||
|
||||
def _sync_closed_trades(self):
|
||||
"""Sync closed trades list with actual exchange trade history"""
|
||||
try:
|
||||
if not self.trading_executor:
|
||||
return
|
||||
|
||||
# Get trade history from executor
|
||||
if hasattr(self.trading_executor, 'get_trade_history'):
|
||||
executor_trades = self.trading_executor.get_trade_history()
|
||||
|
||||
# Clear and rebuild closed_trades list
|
||||
self.dashboard.closed_trades = []
|
||||
|
||||
for trade in executor_trades:
|
||||
# Convert to dashboard format
|
||||
trade_record = {
|
||||
'symbol': getattr(trade, 'symbol', 'ETH/USDT'),
|
||||
'side': getattr(trade, 'side', 'UNKNOWN'),
|
||||
'quantity': getattr(trade, 'quantity', 0),
|
||||
'entry_price': getattr(trade, 'entry_price', 0),
|
||||
'exit_price': getattr(trade, 'exit_price', 0),
|
||||
'entry_time': getattr(trade, 'entry_time', datetime.now()),
|
||||
'exit_time': getattr(trade, 'exit_time', datetime.now()),
|
||||
'pnl': getattr(trade, 'pnl', 0),
|
||||
'fees': getattr(trade, 'fees', 0),
|
||||
'confidence': getattr(trade, 'confidence', 1.0),
|
||||
'trade_type': 'synced_from_executor'
|
||||
}
|
||||
|
||||
# Only add completed trades (with exit_time)
|
||||
if trade_record['exit_time']:
|
||||
self.dashboard.closed_trades.append(trade_record)
|
||||
|
||||
# Update session PnL
|
||||
self.dashboard.session_pnl = sum(trade['pnl'] for trade in self.dashboard.closed_trades)
|
||||
|
||||
logger.info(f"Synced {len(self.dashboard.closed_trades)} closed trades from executor")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing closed trades: {e}")
|
||||
|
||||
def _get_current_price(self, symbol: str) -> float:
|
||||
"""Get current price for a symbol"""
|
||||
try:
|
||||
return self.dashboard._get_current_price(symbol) or 3500.0
|
||||
except:
|
||||
return 3500.0 # Fallback price
|
||||
|
||||
def should_sync(self) -> bool:
|
||||
"""Check if sync is needed based on time interval"""
|
||||
current_time = time.time()
|
||||
if current_time - self.last_sync_time >= self.sync_interval:
|
||||
self.last_sync_time = current_time
|
||||
return True
|
||||
return False
|
||||
|
||||
def create_sync_status_display(self) -> Dict[str, Any]:
|
||||
"""Create detailed sync status for dashboard display"""
|
||||
try:
|
||||
# Get current sync status
|
||||
sync_results = self.sync_all_positions()
|
||||
|
||||
# Create display-friendly format
|
||||
status_display = {
|
||||
'last_sync': datetime.now().strftime('%H:%M:%S'),
|
||||
'sync_healthy': sync_results.get('issues_found', 0) == 0,
|
||||
'positions': {},
|
||||
'closed_trades_count': len(self.dashboard.closed_trades),
|
||||
'session_pnl': self.dashboard.session_pnl
|
||||
}
|
||||
|
||||
# Add position details
|
||||
for symbol, result in sync_results.get('results', {}).items():
|
||||
status_display['positions'][symbol] = {
|
||||
'in_sync': result['in_sync'],
|
||||
'action_taken': result.get('action_taken', 'none'),
|
||||
'has_exchange_position': result['exchange_position'] is not None,
|
||||
'has_dashboard_position': result['dashboard_position'] is not None
|
||||
}
|
||||
|
||||
return status_display
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating sync status display: {e}")
|
||||
return {'error': str(e)}
|
||||
|
||||
|
||||
# Integration with existing dashboard
|
||||
def integrate_enhanced_sync(dashboard):
|
||||
"""Integrate enhanced sync with existing dashboard"""
|
||||
|
||||
# Create enhanced sync instance
|
||||
enhanced_sync = EnhancedPositionSync(dashboard.trading_executor, dashboard)
|
||||
|
||||
# Add to dashboard
|
||||
dashboard.enhanced_sync = enhanced_sync
|
||||
|
||||
# Modify existing metrics update to include sync
|
||||
original_update_metrics = dashboard.update_metrics
|
||||
|
||||
def enhanced_update_metrics(n):
|
||||
"""Enhanced metrics update with position sync"""
|
||||
try:
|
||||
# Perform periodic sync
|
||||
if enhanced_sync.should_sync():
|
||||
sync_results = enhanced_sync.sync_all_positions()
|
||||
if sync_results.get('issues_found', 0) > 0:
|
||||
logger.info(f"Position sync performed: {sync_results['issues_found']} issues corrected")
|
||||
|
||||
# Call original metrics update
|
||||
return original_update_metrics(n)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in enhanced metrics update: {e}")
|
||||
return original_update_metrics(n)
|
||||
|
||||
# Replace the update method
|
||||
dashboard.update_metrics = enhanced_update_metrics
|
||||
|
||||
return enhanced_sync
|
Reference in New Issue
Block a user