Compare commits
2 Commits
0bb4409c30
...
9c56ea238e
Author | SHA1 | Date | |
---|---|---|---|
9c56ea238e | |||
a2c07a1f3e |
@ -22,7 +22,8 @@
|
|||||||
- Ensure thread safety for cache access
|
- Ensure thread safety for cache access
|
||||||
- _Requirements: 1.6, 8.1_
|
- _Requirements: 1.6, 8.1_
|
||||||
|
|
||||||
- [ ] 1.3. Enhance real-time data streaming
|
- [-] 1.3. Enhance real-time data streaming
|
||||||
|
|
||||||
- Improve WebSocket connection management
|
- Improve WebSocket connection management
|
||||||
- Implement reconnection strategies
|
- Implement reconnection strategies
|
||||||
- Add data validation to ensure data integrity
|
- Add data validation to ensure data integrity
|
||||||
|
@ -2653,6 +2653,7 @@ class DataProvider:
|
|||||||
# Significantly reduced frequency for REST API fallback only
|
# Significantly reduced frequency for REST API fallback only
|
||||||
def collect_symbol_data(symbol):
|
def collect_symbol_data(symbol):
|
||||||
rest_api_fallback_count = 0
|
rest_api_fallback_count = 0
|
||||||
|
last_rest_api_call = 0 # Track last REST API call time
|
||||||
while self.cob_collection_active:
|
while self.cob_collection_active:
|
||||||
try:
|
try:
|
||||||
# PRIORITY 1: Try to use WebSocket data first
|
# PRIORITY 1: Try to use WebSocket data first
|
||||||
@ -2664,13 +2665,20 @@ class DataProvider:
|
|||||||
# Much longer sleep since WebSocket provides real-time data
|
# Much longer sleep since WebSocket provides real-time data
|
||||||
time.sleep(10.0) # Only check every 10 seconds when WS is working
|
time.sleep(10.0) # Only check every 10 seconds when WS is working
|
||||||
else:
|
else:
|
||||||
# FALLBACK: Only use REST API if WebSocket fails
|
# FALLBACK: Only use REST API if WebSocket fails AND rate limit allows
|
||||||
rest_api_fallback_count += 1
|
rest_api_fallback_count += 1
|
||||||
if rest_api_fallback_count <= 3: # Limited fallback attempts
|
current_time = time.time()
|
||||||
logger.warning(f"WebSocket COB data unavailable for {symbol}, using REST API fallback #{rest_api_fallback_count}")
|
|
||||||
self._collect_cob_data_for_symbol(symbol)
|
# STRICT RATE LIMITING: Maximum 1 REST API call per second
|
||||||
|
if current_time - last_rest_api_call >= 1.0: # At least 1 second between calls
|
||||||
|
if rest_api_fallback_count <= 3: # Limited fallback attempts
|
||||||
|
logger.warning(f"WebSocket COB data unavailable for {symbol}, using REST API fallback #{rest_api_fallback_count}")
|
||||||
|
self._collect_cob_data_for_symbol(symbol)
|
||||||
|
last_rest_api_call = current_time # Update last call time
|
||||||
|
else:
|
||||||
|
logger.debug(f"Skipping REST API for {symbol} to prevent rate limits (WS data preferred)")
|
||||||
else:
|
else:
|
||||||
logger.debug(f"Skipping REST API for {symbol} to prevent rate limits (WS data preferred)")
|
logger.debug(f"Rate limiting REST API for {symbol} - waiting {1.0 - (current_time - last_rest_api_call):.1f}s")
|
||||||
|
|
||||||
# Much longer sleep when using REST API fallback
|
# Much longer sleep when using REST API fallback
|
||||||
time.sleep(30.0) # 30 seconds between REST calls
|
time.sleep(30.0) # 30 seconds between REST calls
|
||||||
@ -2694,49 +2702,35 @@ class DataProvider:
|
|||||||
for thread in threads:
|
for thread in threads:
|
||||||
thread.join(timeout=1)
|
thread.join(timeout=1)
|
||||||
|
|
||||||
def _get_websocket_cob_data(self, symbol: str) -> Optional[dict]:
|
def _get_websocket_cob_data(self, symbol: str) -> Optional[Dict]:
|
||||||
"""Get COB data from WebSocket streams (rate limit free)"""
|
"""Get COB data from WebSocket streams (primary source)"""
|
||||||
try:
|
try:
|
||||||
binance_symbol = symbol.replace('/', '').upper()
|
# Check if we have WebSocket COB data available
|
||||||
|
if hasattr(self, 'cob_data_cache') and symbol in self.cob_data_cache:
|
||||||
|
cached_data = self.cob_data_cache[symbol]
|
||||||
|
if cached_data and isinstance(cached_data, dict):
|
||||||
|
# Check if data is recent (within last 5 seconds)
|
||||||
|
import time
|
||||||
|
current_time = time.time()
|
||||||
|
data_age = current_time - cached_data.get('timestamp', 0)
|
||||||
|
|
||||||
|
if data_age < 5.0: # Data is fresh
|
||||||
|
logger.debug(f"Using WebSocket COB data for {symbol} (age: {data_age:.1f}s)")
|
||||||
|
return cached_data
|
||||||
|
else:
|
||||||
|
logger.debug(f"WebSocket COB data for {symbol} is stale (age: {data_age:.1f}s)")
|
||||||
|
|
||||||
# Check if we have recent WebSocket tick data
|
# Check if multi-exchange COB provider has WebSocket data
|
||||||
if binance_symbol in self.tick_buffers and len(self.tick_buffers[binance_symbol]) > 10:
|
if hasattr(self, 'multi_exchange_cob_provider') and self.multi_exchange_cob_provider:
|
||||||
recent_ticks = list(self.tick_buffers[binance_symbol])[-50:] # Last 50 ticks
|
try:
|
||||||
|
cob_data = self.multi_exchange_cob_provider.get_latest_cob_data(symbol)
|
||||||
if recent_ticks:
|
if cob_data and isinstance(cob_data, dict):
|
||||||
# Calculate COB data from WebSocket ticks
|
logger.debug(f"Using multi-exchange WebSocket COB data for {symbol}")
|
||||||
latest_tick = recent_ticks[-1]
|
return cob_data
|
||||||
|
except Exception as e:
|
||||||
# Calculate bid/ask liquidity from recent tick patterns
|
logger.debug(f"Error getting multi-exchange COB data for {symbol}: {e}")
|
||||||
buy_volume = sum(tick.volume for tick in recent_ticks if tick.side == 'buy')
|
|
||||||
sell_volume = sum(tick.volume for tick in recent_ticks if tick.side == 'sell')
|
|
||||||
total_volume = buy_volume + sell_volume
|
|
||||||
|
|
||||||
# Calculate metrics
|
|
||||||
imbalance = (buy_volume - sell_volume) / total_volume if total_volume > 0 else 0
|
|
||||||
avg_price = sum(tick.price for tick in recent_ticks) / len(recent_ticks)
|
|
||||||
|
|
||||||
# Create synthetic COB snapshot from WebSocket data
|
|
||||||
cob_snapshot = {
|
|
||||||
'symbol': symbol,
|
|
||||||
'timestamp': datetime.now(),
|
|
||||||
'source': 'websocket', # Mark as WebSocket source
|
|
||||||
'stats': {
|
|
||||||
'mid_price': latest_tick.price,
|
|
||||||
'avg_price': avg_price,
|
|
||||||
'imbalance': imbalance,
|
|
||||||
'buy_volume': buy_volume,
|
|
||||||
'sell_volume': sell_volume,
|
|
||||||
'total_volume': total_volume,
|
|
||||||
'tick_count': len(recent_ticks),
|
|
||||||
'best_bid': latest_tick.price - 0.01, # Approximate
|
|
||||||
'best_ask': latest_tick.price + 0.01, # Approximate
|
|
||||||
'spread_bps': 10 # Approximate spread
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return cob_snapshot
|
|
||||||
|
|
||||||
|
logger.debug(f"No WebSocket COB data available for {symbol}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -159,187 +159,40 @@ class MultiExchangeCOBProvider:
|
|||||||
to create a consolidated view of market liquidity and pricing.
|
to create a consolidated view of market liquidity and pricing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, symbols: Optional[List[str]] = None, bucket_size_bps: float = 1.0):
|
def __init__(self, symbols: List[str], exchange_configs: Dict[str, ExchangeConfig]):
|
||||||
"""
|
"""Initialize multi-exchange COB provider"""
|
||||||
Initialize Multi-Exchange COB Provider
|
self.symbols = symbols
|
||||||
|
self.exchange_configs = exchange_configs
|
||||||
Args:
|
self.active_exchanges = ['binance'] # Focus on Binance for now
|
||||||
symbols: List of symbols to monitor (e.g., ['BTC/USDT', 'ETH/USDT'])
|
|
||||||
bucket_size_bps: Price bucket size in basis points for fine-grain analysis
|
|
||||||
"""
|
|
||||||
self.symbols = symbols or ['BTC/USDT', 'ETH/USDT']
|
|
||||||
self.bucket_size_bps = bucket_size_bps
|
|
||||||
self.bucket_update_frequency = 100 # ms
|
|
||||||
self.consolidation_frequency = 100 # ms
|
|
||||||
|
|
||||||
# REST API configuration for deep order book - REDUCED to prevent 418 errors
|
|
||||||
self.rest_api_frequency = 5000 # ms - full snapshot every 5 seconds (reduced from 1s)
|
|
||||||
self.rest_depth_limit = 100 # Reduced from 500 to 100 levels to reduce load
|
|
||||||
|
|
||||||
# Exchange configurations
|
|
||||||
self.exchange_configs = self._initialize_exchange_configs()
|
|
||||||
|
|
||||||
# Rate limiter for REST API calls
|
|
||||||
self.rest_rate_limiter = SimpleRateLimiter(requests_per_second=2.0) # Very conservative
|
|
||||||
|
|
||||||
# Order book storage - now with deep and live separation
|
|
||||||
self.exchange_order_books = {
|
|
||||||
symbol: {
|
|
||||||
exchange.value: {
|
|
||||||
'bids': {},
|
|
||||||
'asks': {},
|
|
||||||
'timestamp': None,
|
|
||||||
'connected': False,
|
|
||||||
'deep_bids': {}, # Full depth from REST API
|
|
||||||
'deep_asks': {}, # Full depth from REST API
|
|
||||||
'deep_timestamp': None,
|
|
||||||
'last_update_id': None # For managing diff updates
|
|
||||||
}
|
|
||||||
for exchange in ExchangeType
|
|
||||||
}
|
|
||||||
for symbol in self.symbols
|
|
||||||
}
|
|
||||||
|
|
||||||
# Consolidated order books
|
|
||||||
self.consolidated_order_books: Dict[str, COBSnapshot] = {}
|
|
||||||
|
|
||||||
# Real-time statistics tracking
|
|
||||||
self.realtime_stats: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
|
||||||
self.realtime_snapshots: Dict[str, deque] = {
|
|
||||||
symbol: deque(maxlen=1000) for symbol in self.symbols
|
|
||||||
}
|
|
||||||
|
|
||||||
# Session tracking for SVP
|
|
||||||
self.session_start_time = datetime.now()
|
|
||||||
self.session_trades: Dict[str, List[Dict]] = {symbol: [] for symbol in self.symbols}
|
|
||||||
self.svp_cache: Dict[str, Dict] = {symbol: {} for symbol in self.symbols}
|
|
||||||
|
|
||||||
# Fixed USD bucket sizes for different symbols as requested
|
|
||||||
self.fixed_usd_buckets = {
|
|
||||||
'BTC/USDT': 10.0, # $10 buckets for BTC
|
|
||||||
'ETH/USDT': 1.0, # $1 buckets for ETH
|
|
||||||
}
|
|
||||||
|
|
||||||
# WebSocket management
|
|
||||||
self.is_streaming = False
|
self.is_streaming = False
|
||||||
self.active_exchanges = ['binance'] # Start with Binance only
|
self.cob_data_cache = {} # Cache for COB data
|
||||||
|
self.cob_subscribers = [] # List of callback functions
|
||||||
|
|
||||||
# Callbacks for real-time updates
|
# Rate limiting for REST API fallback
|
||||||
self.cob_update_callbacks = []
|
self.last_rest_api_call = 0
|
||||||
self.bucket_update_callbacks = []
|
self.rest_api_call_count = 0
|
||||||
|
|
||||||
# Performance tracking
|
logger.info(f"Multi-exchange COB provider initialized for symbols: {symbols}")
|
||||||
self.exchange_update_counts = {exchange.value: 0 for exchange in ExchangeType}
|
|
||||||
self.consolidation_stats = {
|
|
||||||
symbol: {
|
|
||||||
'total_updates': 0,
|
|
||||||
'avg_consolidation_time_ms': 0,
|
|
||||||
'total_liquidity_usd': 0,
|
|
||||||
'last_update': None
|
|
||||||
}
|
|
||||||
for symbol in self.symbols
|
|
||||||
}
|
|
||||||
self.processing_times = {'consolidation': deque(maxlen=100), 'rest_api': deque(maxlen=100)}
|
|
||||||
|
|
||||||
# Thread safety
|
|
||||||
self.data_lock = asyncio.Lock()
|
|
||||||
|
|
||||||
# Initialize aiohttp session and connector to None, will be set up in start_streaming
|
|
||||||
self.session: Optional[aiohttp.ClientSession] = None
|
|
||||||
self.connector: Optional[aiohttp.TCPConnector] = None
|
|
||||||
self.rest_session: Optional[aiohttp.ClientSession] = None # Added for explicit None initialization
|
|
||||||
|
|
||||||
# Create REST API session
|
|
||||||
# Fix for Windows aiodns issue - use ThreadedResolver instead
|
|
||||||
connector = aiohttp.TCPConnector(
|
|
||||||
resolver=aiohttp.ThreadedResolver(),
|
|
||||||
use_dns_cache=False
|
|
||||||
)
|
|
||||||
self.rest_session = aiohttp.ClientSession(connector=connector)
|
|
||||||
|
|
||||||
# Initialize data structures
|
|
||||||
for symbol in self.symbols:
|
|
||||||
self.exchange_order_books[symbol]['binance']['connected'] = False
|
|
||||||
self.exchange_order_books[symbol]['binance']['deep_bids'] = {}
|
|
||||||
self.exchange_order_books[symbol]['binance']['deep_asks'] = {}
|
|
||||||
self.exchange_order_books[symbol]['binance']['deep_timestamp'] = None
|
|
||||||
self.exchange_order_books[symbol]['binance']['last_update_id'] = None
|
|
||||||
self.realtime_snapshots[symbol].append(COBSnapshot(
|
|
||||||
symbol=symbol,
|
|
||||||
timestamp=datetime.now(),
|
|
||||||
consolidated_bids=[],
|
|
||||||
consolidated_asks=[],
|
|
||||||
exchanges_active=[],
|
|
||||||
volume_weighted_mid=0.0,
|
|
||||||
total_bid_liquidity=0.0,
|
|
||||||
total_ask_liquidity=0.0,
|
|
||||||
spread_bps=0.0,
|
|
||||||
liquidity_imbalance=0.0,
|
|
||||||
price_buckets={}
|
|
||||||
))
|
|
||||||
|
|
||||||
logger.info(f"Multi-Exchange COB Provider initialized")
|
|
||||||
logger.info(f"Symbols: {self.symbols}")
|
|
||||||
logger.info(f"Bucket size: {bucket_size_bps} bps")
|
|
||||||
logger.info(f"Fixed USD buckets: {self.fixed_usd_buckets}")
|
|
||||||
logger.info(f"Configured exchanges: {[e.value for e in ExchangeType]}")
|
|
||||||
|
|
||||||
def _initialize_exchange_configs(self) -> Dict[str, ExchangeConfig]:
|
def subscribe_to_cob_updates(self, callback):
|
||||||
"""Initialize exchange configurations"""
|
"""Subscribe to COB data updates"""
|
||||||
configs = {}
|
self.cob_subscribers.append(callback)
|
||||||
|
logger.debug(f"Added COB subscriber, total: {len(self.cob_subscribers)}")
|
||||||
# Binance configuration
|
|
||||||
configs[ExchangeType.BINANCE.value] = ExchangeConfig(
|
|
||||||
exchange_type=ExchangeType.BINANCE,
|
|
||||||
weight=0.3, # Higher weight due to volume
|
|
||||||
websocket_url="wss://stream.binance.com:9443/ws/",
|
|
||||||
rest_api_url="https://api.binance.com",
|
|
||||||
symbols_mapping={'BTC/USDT': 'BTCUSDT', 'ETH/USDT': 'ETHUSDT'},
|
|
||||||
rate_limits={'requests_per_minute': 1200, 'weight_per_minute': 6000}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Coinbase Pro configuration
|
|
||||||
configs[ExchangeType.COINBASE.value] = ExchangeConfig(
|
|
||||||
exchange_type=ExchangeType.COINBASE,
|
|
||||||
weight=0.25,
|
|
||||||
websocket_url="wss://ws-feed.exchange.coinbase.com",
|
|
||||||
rest_api_url="https://api.exchange.coinbase.com",
|
|
||||||
symbols_mapping={'BTC/USDT': 'BTC-USD', 'ETH/USDT': 'ETH-USD'},
|
|
||||||
rate_limits={'requests_per_minute': 600}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Kraken configuration
|
|
||||||
configs[ExchangeType.KRAKEN.value] = ExchangeConfig(
|
|
||||||
exchange_type=ExchangeType.KRAKEN,
|
|
||||||
weight=0.2,
|
|
||||||
websocket_url="wss://ws.kraken.com",
|
|
||||||
rest_api_url="https://api.kraken.com",
|
|
||||||
symbols_mapping={'BTC/USDT': 'XBT/USDT', 'ETH/USDT': 'ETH/USDT'},
|
|
||||||
rate_limits={'requests_per_minute': 900}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Huobi configuration
|
|
||||||
configs[ExchangeType.HUOBI.value] = ExchangeConfig(
|
|
||||||
exchange_type=ExchangeType.HUOBI,
|
|
||||||
weight=0.15,
|
|
||||||
websocket_url="wss://api.huobi.pro/ws",
|
|
||||||
rest_api_url="https://api.huobi.pro",
|
|
||||||
symbols_mapping={'BTC/USDT': 'btcusdt', 'ETH/USDT': 'ethusdt'},
|
|
||||||
rate_limits={'requests_per_minute': 2000}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Bitfinex configuration
|
|
||||||
configs[ExchangeType.BITFINEX.value] = ExchangeConfig(
|
|
||||||
exchange_type=ExchangeType.BITFINEX,
|
|
||||||
weight=0.1,
|
|
||||||
websocket_url="wss://api-pub.bitfinex.com/ws/2",
|
|
||||||
rest_api_url="https://api-pub.bitfinex.com",
|
|
||||||
symbols_mapping={'BTC/USDT': 'tBTCUST', 'ETH/USDT': 'tETHUST'},
|
|
||||||
rate_limits={'requests_per_minute': 1000}
|
|
||||||
)
|
|
||||||
|
|
||||||
return configs
|
|
||||||
|
|
||||||
|
async def _notify_cob_subscribers(self, symbol: str, cob_snapshot: Dict):
|
||||||
|
"""Notify all subscribers of COB data updates"""
|
||||||
|
try:
|
||||||
|
for callback in self.cob_subscribers:
|
||||||
|
try:
|
||||||
|
if asyncio.iscoroutinefunction(callback):
|
||||||
|
await callback(symbol, cob_snapshot)
|
||||||
|
else:
|
||||||
|
callback(symbol, cob_snapshot)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in COB subscriber callback: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error notifying COB subscribers: {e}")
|
||||||
|
|
||||||
async def start_streaming(self):
|
async def start_streaming(self):
|
||||||
"""Start real-time order book streaming from all configured exchanges using only WebSocket"""
|
"""Start real-time order book streaming from all configured exchanges using only WebSocket"""
|
||||||
logger.info(f"Starting COB streaming for symbols: {self.symbols}")
|
logger.info(f"Starting COB streaming for symbols: {self.symbols}")
|
||||||
@ -1667,23 +1520,97 @@ class MultiExchangeCOBProvider:
|
|||||||
async with websockets_connect(ws_url) as websocket:
|
async with websockets_connect(ws_url) as websocket:
|
||||||
logger.info(f"Connected to Binance full depth stream for {symbol}")
|
logger.info(f"Connected to Binance full depth stream for {symbol}")
|
||||||
|
|
||||||
async for message in websocket:
|
while self.is_streaming:
|
||||||
if not self.is_streaming:
|
|
||||||
break
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
message = await websocket.recv()
|
||||||
data = json.loads(message)
|
data = json.loads(message)
|
||||||
await self._process_binance_full_depth(symbol, data)
|
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
# Process full depth data
|
||||||
logger.error(f"Error parsing Binance full depth message: {e}")
|
if 'bids' in data and 'asks' in data:
|
||||||
|
# Create comprehensive COB snapshot
|
||||||
|
cob_snapshot = {
|
||||||
|
'symbol': symbol,
|
||||||
|
'timestamp': time.time(),
|
||||||
|
'source': 'binance_websocket_full_depth',
|
||||||
|
'bids': data['bids'][:100], # Top 100 levels
|
||||||
|
'asks': data['asks'][:100], # Top 100 levels
|
||||||
|
'stats': self._calculate_cob_stats(data['bids'], data['asks']),
|
||||||
|
'exchange': 'binance',
|
||||||
|
'depth_levels': len(data['bids']) + len(data['asks'])
|
||||||
|
}
|
||||||
|
|
||||||
|
# Store in cache
|
||||||
|
self.cob_data_cache[symbol] = cob_snapshot
|
||||||
|
|
||||||
|
# Notify subscribers
|
||||||
|
await self._notify_cob_subscribers(symbol, cob_snapshot)
|
||||||
|
|
||||||
|
logger.debug(f"Full depth COB update for {symbol}: {len(data['bids'])} bids, {len(data['asks'])} asks")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing Binance full depth: {e}")
|
if "ConnectionClosed" in str(e) or "connection closed" in str(e).lower():
|
||||||
|
logger.warning(f"Binance full depth WebSocket connection closed for {symbol}")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing full depth data for {symbol}: {e}")
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Binance full depth WebSocket error for {symbol}: {e}")
|
logger.error(f"Error in Binance full depth stream for {symbol}: {e}")
|
||||||
finally:
|
|
||||||
logger.info(f"Disconnected from Binance full depth stream for {symbol}")
|
def _calculate_cob_stats(self, bids: List, asks: List) -> Dict:
|
||||||
|
"""Calculate COB statistics from order book data"""
|
||||||
|
try:
|
||||||
|
if not bids or not asks:
|
||||||
|
return {
|
||||||
|
'mid_price': 0,
|
||||||
|
'spread_bps': 0,
|
||||||
|
'imbalance': 0,
|
||||||
|
'bid_liquidity': 0,
|
||||||
|
'ask_liquidity': 0
|
||||||
|
}
|
||||||
|
|
||||||
|
# Convert string values to float
|
||||||
|
bid_prices = [float(bid[0]) for bid in bids]
|
||||||
|
bid_sizes = [float(bid[1]) for bid in bids]
|
||||||
|
ask_prices = [float(ask[0]) for ask in asks]
|
||||||
|
ask_sizes = [float(ask[1]) for ask in asks]
|
||||||
|
|
||||||
|
# Calculate best bid/ask
|
||||||
|
best_bid = max(bid_prices)
|
||||||
|
best_ask = min(ask_prices)
|
||||||
|
mid_price = (best_bid + best_ask) / 2
|
||||||
|
|
||||||
|
# Calculate spread
|
||||||
|
spread_bps = ((best_ask - best_bid) / mid_price) * 10000 if mid_price > 0 else 0
|
||||||
|
|
||||||
|
# Calculate liquidity
|
||||||
|
bid_liquidity = sum(bid_sizes[:20]) # Top 20 levels
|
||||||
|
ask_liquidity = sum(ask_sizes[:20]) # Top 20 levels
|
||||||
|
total_liquidity = bid_liquidity + ask_liquidity
|
||||||
|
|
||||||
|
# Calculate imbalance
|
||||||
|
imbalance = (bid_liquidity - ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
|
||||||
|
|
||||||
|
return {
|
||||||
|
'mid_price': mid_price,
|
||||||
|
'spread_bps': spread_bps,
|
||||||
|
'imbalance': imbalance,
|
||||||
|
'bid_liquidity': bid_liquidity,
|
||||||
|
'ask_liquidity': ask_liquidity,
|
||||||
|
'best_bid': best_bid,
|
||||||
|
'best_ask': best_ask
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calculating COB stats: {e}")
|
||||||
|
return {
|
||||||
|
'mid_price': 0,
|
||||||
|
'spread_bps': 0,
|
||||||
|
'imbalance': 0,
|
||||||
|
'bid_liquidity': 0,
|
||||||
|
'ask_liquidity': 0
|
||||||
|
}
|
||||||
|
|
||||||
async def _stream_binance_book_ticker(self, symbol: str):
|
async def _stream_binance_book_ticker(self, symbol: str):
|
||||||
"""Stream best bid/ask prices from Binance WebSocket"""
|
"""Stream best bid/ask prices from Binance WebSocket"""
|
||||||
@ -1909,4 +1836,14 @@ class MultiExchangeCOBProvider:
|
|||||||
})
|
})
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}")
|
logger.error(f"Error adding aggregate trade to analysis for {symbol}: {e}")
|
||||||
|
|
||||||
|
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
|
||||||
|
"""Get latest COB data for a symbol from cache"""
|
||||||
|
try:
|
||||||
|
if symbol in self.cob_data_cache:
|
||||||
|
return self.cob_data_cache[symbol]
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting latest COB data for {symbol}: {e}")
|
||||||
|
return None
|
@ -1985,6 +1985,53 @@ class TradingOrchestrator:
|
|||||||
self.trading_executor = trading_executor
|
self.trading_executor = trading_executor
|
||||||
logger.info("Trading executor set for position tracking and P&L feedback")
|
logger.info("Trading executor set for position tracking and P&L feedback")
|
||||||
|
|
||||||
|
def get_profitability_reward_multiplier(self) -> float:
|
||||||
|
"""Get the current profitability reward multiplier from trading executor
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Current profitability reward multiplier (0.0 to 2.0)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if self.trading_executor and hasattr(self.trading_executor, 'get_profitability_reward_multiplier'):
|
||||||
|
multiplier = self.trading_executor.get_profitability_reward_multiplier()
|
||||||
|
logger.debug(f"Current profitability reward multiplier: {multiplier:.2f}")
|
||||||
|
return multiplier
|
||||||
|
return 0.0
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting profitability reward multiplier: {e}")
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
def calculate_enhanced_reward(self, base_pnl: float, confidence: float = 1.0) -> float:
|
||||||
|
"""Calculate enhanced reward with profitability multiplier
|
||||||
|
|
||||||
|
Args:
|
||||||
|
base_pnl: Base P&L from the trade
|
||||||
|
confidence: Confidence level of the prediction (0.0 to 1.0)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Enhanced reward with profitability multiplier applied
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Get the dynamic profitability multiplier
|
||||||
|
profitability_multiplier = self.get_profitability_reward_multiplier()
|
||||||
|
|
||||||
|
# Base reward is the P&L
|
||||||
|
base_reward = base_pnl
|
||||||
|
|
||||||
|
# Apply profitability multiplier only to positive P&L (profitable trades)
|
||||||
|
if base_pnl > 0 and profitability_multiplier > 0:
|
||||||
|
# Enhance profitable trades with the multiplier
|
||||||
|
enhanced_reward = base_pnl * (1.0 + profitability_multiplier)
|
||||||
|
logger.debug(f"Enhanced reward: ${base_pnl:.2f} → ${enhanced_reward:.2f} (multiplier: {profitability_multiplier:.2f})")
|
||||||
|
return enhanced_reward
|
||||||
|
else:
|
||||||
|
# No enhancement for losing trades or when multiplier is 0
|
||||||
|
return base_reward
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calculating enhanced reward: {e}")
|
||||||
|
return base_pnl
|
||||||
|
|
||||||
def _check_signal_confirmation(self, symbol: str, signal_data: Dict) -> Optional[str]:
|
def _check_signal_confirmation(self, symbol: str, signal_data: Dict) -> Optional[str]:
|
||||||
"""Check if we have enough signal confirmations for trend confirmation with rate limiting"""
|
"""Check if we have enough signal confirmations for trend confirmation with rate limiting"""
|
||||||
try:
|
try:
|
||||||
|
@ -176,13 +176,25 @@ class TradingExecutor:
|
|||||||
self.simulation_balance = self.trading_config.get('simulation_account_usd', 100.0)
|
self.simulation_balance = self.trading_config.get('simulation_account_usd', 100.0)
|
||||||
self.simulation_positions = {} # symbol -> position data with real entry prices
|
self.simulation_positions = {} # symbol -> position data with real entry prices
|
||||||
|
|
||||||
# Trading fees configuration (0.1% for both open and close)
|
# Trading fees configuration (0.1% for both open and close - REVERTED TO NORMAL)
|
||||||
self.trading_fees = {
|
self.trading_fees = {
|
||||||
'open_fee_percent': 0.001, # 0.1% fee when opening position
|
'open_fee_percent': 0.001, # 0.1% fee when opening position
|
||||||
'close_fee_percent': 0.001, # 0.1% fee when closing position
|
'close_fee_percent': 0.001, # 0.1% fee when closing position
|
||||||
'total_round_trip_fee': 0.002 # 0.2% total for round trip
|
'total_round_trip_fee': 0.002 # 0.2% total for round trip
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Dynamic profitability reward parameter - starts at 0, adjusts based on success rate
|
||||||
|
self.profitability_reward_multiplier = 0.0 # Starts at 0, can be increased
|
||||||
|
self.min_profitability_multiplier = 0.0 # Minimum value
|
||||||
|
self.max_profitability_multiplier = 2.0 # Maximum 2x multiplier
|
||||||
|
self.profitability_adjustment_step = 0.1 # Adjust by 0.1 each time
|
||||||
|
|
||||||
|
# Success rate tracking for profitability adjustment
|
||||||
|
self.recent_trades_window = 20 # Look at last 20 trades
|
||||||
|
self.success_rate_increase_threshold = 0.60 # Increase multiplier if >60% success
|
||||||
|
self.success_rate_decrease_threshold = 0.51 # Decrease multiplier if <51% success
|
||||||
|
self.last_profitability_adjustment = datetime.now()
|
||||||
|
|
||||||
logger.info(f"TradingExecutor initialized - Trading: {self.trading_enabled}, Mode: {self.trading_mode}")
|
logger.info(f"TradingExecutor initialized - Trading: {self.trading_enabled}, Mode: {self.trading_mode}")
|
||||||
logger.info(f"Simulation balance: ${self.simulation_balance:.2f}")
|
logger.info(f"Simulation balance: ${self.simulation_balance:.2f}")
|
||||||
|
|
||||||
@ -622,6 +634,83 @@ class TradingExecutor:
|
|||||||
logger.error(f"Error cancelling open orders for {symbol}: {e}")
|
logger.error(f"Error cancelling open orders for {symbol}: {e}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
def _calculate_recent_success_rate(self) -> float:
|
||||||
|
"""Calculate success rate of recent closed trades
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Success rate (0.0 to 1.0) of recent trades
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if len(self.trade_records) < 5: # Need at least 5 trades
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Get recent trades (up to the window size)
|
||||||
|
recent_trades = self.trade_records[-self.recent_trades_window:]
|
||||||
|
|
||||||
|
if not recent_trades:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Count winning trades (net PnL > 0)
|
||||||
|
winning_trades = sum(1 for trade in recent_trades if trade.net_pnl > 0)
|
||||||
|
success_rate = winning_trades / len(recent_trades)
|
||||||
|
|
||||||
|
logger.debug(f"Recent success rate: {success_rate:.2%} ({winning_trades}/{len(recent_trades)} trades)")
|
||||||
|
return success_rate
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error calculating success rate: {e}")
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
def _adjust_profitability_reward_multiplier(self):
|
||||||
|
"""Adjust profitability reward multiplier based on recent success rate"""
|
||||||
|
try:
|
||||||
|
# Only adjust every 5 minutes to avoid too frequent changes
|
||||||
|
current_time = datetime.now()
|
||||||
|
time_since_last_adjustment = (current_time - self.last_profitability_adjustment).total_seconds()
|
||||||
|
|
||||||
|
if time_since_last_adjustment < 300: # 5 minutes
|
||||||
|
return
|
||||||
|
|
||||||
|
success_rate = self._calculate_recent_success_rate()
|
||||||
|
|
||||||
|
# Only adjust if we have enough trades
|
||||||
|
if len(self.trade_records) < 10:
|
||||||
|
return
|
||||||
|
|
||||||
|
old_multiplier = self.profitability_reward_multiplier
|
||||||
|
|
||||||
|
# Increase multiplier if success rate > 60%
|
||||||
|
if success_rate > self.success_rate_increase_threshold:
|
||||||
|
self.profitability_reward_multiplier = min(
|
||||||
|
self.max_profitability_multiplier,
|
||||||
|
self.profitability_reward_multiplier + self.profitability_adjustment_step
|
||||||
|
)
|
||||||
|
logger.info(f"🎯 SUCCESS RATE HIGH ({success_rate:.1%}) - Increased profitability multiplier: {old_multiplier:.1f} → {self.profitability_reward_multiplier:.1f}")
|
||||||
|
|
||||||
|
# Decrease multiplier if success rate < 51%
|
||||||
|
elif success_rate < self.success_rate_decrease_threshold:
|
||||||
|
self.profitability_reward_multiplier = max(
|
||||||
|
self.min_profitability_multiplier,
|
||||||
|
self.profitability_reward_multiplier - self.profitability_adjustment_step
|
||||||
|
)
|
||||||
|
logger.info(f"⚠️ SUCCESS RATE LOW ({success_rate:.1%}) - Decreased profitability multiplier: {old_multiplier:.1f} → {self.profitability_reward_multiplier:.1f}")
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.debug(f"Success rate {success_rate:.1%} in acceptable range - keeping multiplier at {self.profitability_reward_multiplier:.1f}")
|
||||||
|
|
||||||
|
self.last_profitability_adjustment = current_time
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error adjusting profitability reward multiplier: {e}")
|
||||||
|
|
||||||
|
def get_profitability_reward_multiplier(self) -> float:
|
||||||
|
"""Get current profitability reward multiplier
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Current profitability reward multiplier
|
||||||
|
"""
|
||||||
|
return self.profitability_reward_multiplier
|
||||||
|
|
||||||
def _can_reenable_live_trading(self) -> bool:
|
def _can_reenable_live_trading(self) -> bool:
|
||||||
"""Check if trading performance has improved enough to re-enable live trading
|
"""Check if trading performance has improved enough to re-enable live trading
|
||||||
|
|
||||||
@ -1198,7 +1287,11 @@ class TradingExecutor:
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.trade_history.append(trade_record)
|
self.trade_history.append(trade_record)
|
||||||
|
self.trade_records.append(trade_record) # Add to trade records for success rate tracking
|
||||||
self.daily_loss += max(0, -pnl) # Add to daily loss if negative
|
self.daily_loss += max(0, -pnl) # Add to daily loss if negative
|
||||||
|
|
||||||
|
# Adjust profitability reward multiplier based on recent performance
|
||||||
|
self._adjust_profitability_reward_multiplier()
|
||||||
|
|
||||||
# Update consecutive losses
|
# Update consecutive losses
|
||||||
if pnl < -0.001: # A losing trade
|
if pnl < -0.001: # A losing trade
|
||||||
@ -1289,8 +1382,12 @@ class TradingExecutor:
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.trade_history.append(trade_record)
|
self.trade_history.append(trade_record)
|
||||||
|
self.trade_records.append(trade_record) # Add to trade records for success rate tracking
|
||||||
self.daily_loss += max(0, -(pnl - fees)) # Add to daily loss if negative
|
self.daily_loss += max(0, -(pnl - fees)) # Add to daily loss if negative
|
||||||
|
|
||||||
|
# Adjust profitability reward multiplier based on recent performance
|
||||||
|
self._adjust_profitability_reward_multiplier()
|
||||||
|
|
||||||
# Update consecutive losses
|
# Update consecutive losses
|
||||||
if pnl < -0.001: # A losing trade
|
if pnl < -0.001: # A losing trade
|
||||||
self.consecutive_losses += 1
|
self.consecutive_losses += 1
|
||||||
@ -1356,7 +1453,11 @@ class TradingExecutor:
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.trade_history.append(trade_record)
|
self.trade_history.append(trade_record)
|
||||||
|
self.trade_records.append(trade_record) # Add to trade records for success rate tracking
|
||||||
self.daily_loss += max(0, -pnl) # Add to daily loss if negative
|
self.daily_loss += max(0, -pnl) # Add to daily loss if negative
|
||||||
|
|
||||||
|
# Adjust profitability reward multiplier based on recent performance
|
||||||
|
self._adjust_profitability_reward_multiplier()
|
||||||
|
|
||||||
# Update consecutive losses
|
# Update consecutive losses
|
||||||
if pnl < -0.001: # A losing trade
|
if pnl < -0.001: # A losing trade
|
||||||
@ -1428,8 +1529,12 @@ class TradingExecutor:
|
|||||||
)
|
)
|
||||||
|
|
||||||
self.trade_history.append(trade_record)
|
self.trade_history.append(trade_record)
|
||||||
|
self.trade_records.append(trade_record) # Add to trade records for success rate tracking
|
||||||
self.daily_loss += max(0, -(pnl - fees)) # Add to daily loss if negative
|
self.daily_loss += max(0, -(pnl - fees)) # Add to daily loss if negative
|
||||||
|
|
||||||
|
# Adjust profitability reward multiplier based on recent performance
|
||||||
|
self._adjust_profitability_reward_multiplier()
|
||||||
|
|
||||||
# Update consecutive losses
|
# Update consecutive losses
|
||||||
if pnl < -0.001: # A losing trade
|
if pnl < -0.001: # A losing trade
|
||||||
self.consecutive_losses += 1
|
self.consecutive_losses += 1
|
||||||
|
218
run_simple_dashboard.py
Normal file
218
run_simple_dashboard.py
Normal file
@ -0,0 +1,218 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Simple Dashboard Runner - Fixed version for testing
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
# Fix OpenMP library conflicts
|
||||||
|
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
|
||||||
|
os.environ['OMP_NUM_THREADS'] = '4'
|
||||||
|
|
||||||
|
# Fix matplotlib backend
|
||||||
|
import matplotlib
|
||||||
|
matplotlib.use('Agg')
|
||||||
|
|
||||||
|
# Add project root to path
|
||||||
|
project_root = Path(__file__).parent
|
||||||
|
sys.path.insert(0, str(project_root))
|
||||||
|
|
||||||
|
# Setup logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def create_simple_dashboard():
|
||||||
|
"""Create a simple working dashboard"""
|
||||||
|
try:
|
||||||
|
import dash
|
||||||
|
from dash import html, dcc, Input, Output
|
||||||
|
import plotly.graph_objs as go
|
||||||
|
import pandas as pd
|
||||||
|
import numpy as np
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
# Create Dash app
|
||||||
|
app = dash.Dash(__name__)
|
||||||
|
|
||||||
|
# Simple layout
|
||||||
|
app.layout = html.Div([
|
||||||
|
html.H1("Trading System Dashboard", style={'textAlign': 'center', 'color': '#2c3e50'}),
|
||||||
|
|
||||||
|
html.Div([
|
||||||
|
html.Div([
|
||||||
|
html.H3("System Status", style={'color': '#27ae60'}),
|
||||||
|
html.P(id='system-status', children="System: RUNNING", style={'fontSize': '18px'}),
|
||||||
|
html.P(id='current-time', children=f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"),
|
||||||
|
], style={'width': '48%', 'display': 'inline-block', 'padding': '20px'}),
|
||||||
|
|
||||||
|
html.Div([
|
||||||
|
html.H3("Trading Stats", style={'color': '#3498db'}),
|
||||||
|
html.P("Total Trades: 0"),
|
||||||
|
html.P("Success Rate: 0%"),
|
||||||
|
html.P("Current PnL: $0.00"),
|
||||||
|
], style={'width': '48%', 'display': 'inline-block', 'padding': '20px'}),
|
||||||
|
]),
|
||||||
|
|
||||||
|
html.Div([
|
||||||
|
dcc.Graph(id='price-chart'),
|
||||||
|
], style={'padding': '20px'}),
|
||||||
|
|
||||||
|
html.Div([
|
||||||
|
dcc.Graph(id='performance-chart'),
|
||||||
|
], style={'padding': '20px'}),
|
||||||
|
|
||||||
|
# Auto-refresh component
|
||||||
|
dcc.Interval(
|
||||||
|
id='interval-component',
|
||||||
|
interval=5000, # Update every 5 seconds
|
||||||
|
n_intervals=0
|
||||||
|
)
|
||||||
|
])
|
||||||
|
|
||||||
|
# Callback for updating time
|
||||||
|
@app.callback(
|
||||||
|
Output('current-time', 'children'),
|
||||||
|
Input('interval-component', 'n_intervals')
|
||||||
|
)
|
||||||
|
def update_time(n):
|
||||||
|
return f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
|
||||||
|
|
||||||
|
# Callback for price chart
|
||||||
|
@app.callback(
|
||||||
|
Output('price-chart', 'figure'),
|
||||||
|
Input('interval-component', 'n_intervals')
|
||||||
|
)
|
||||||
|
def update_price_chart(n):
|
||||||
|
# Generate sample data
|
||||||
|
dates = pd.date_range(start=datetime.now() - timedelta(hours=24),
|
||||||
|
end=datetime.now(), freq='1H')
|
||||||
|
prices = 3000 + np.cumsum(np.random.randn(len(dates)) * 10)
|
||||||
|
|
||||||
|
fig = go.Figure()
|
||||||
|
fig.add_trace(go.Scatter(
|
||||||
|
x=dates,
|
||||||
|
y=prices,
|
||||||
|
mode='lines',
|
||||||
|
name='ETH/USDT',
|
||||||
|
line=dict(color='#3498db', width=2)
|
||||||
|
))
|
||||||
|
|
||||||
|
fig.update_layout(
|
||||||
|
title='ETH/USDT Price Chart (24H)',
|
||||||
|
xaxis_title='Time',
|
||||||
|
yaxis_title='Price (USD)',
|
||||||
|
template='plotly_white',
|
||||||
|
height=400
|
||||||
|
)
|
||||||
|
|
||||||
|
return fig
|
||||||
|
|
||||||
|
# Callback for performance chart
|
||||||
|
@app.callback(
|
||||||
|
Output('performance-chart', 'figure'),
|
||||||
|
Input('interval-component', 'n_intervals')
|
||||||
|
)
|
||||||
|
def update_performance_chart(n):
|
||||||
|
# Generate sample performance data
|
||||||
|
dates = pd.date_range(start=datetime.now() - timedelta(days=7),
|
||||||
|
end=datetime.now(), freq='1D')
|
||||||
|
performance = np.cumsum(np.random.randn(len(dates)) * 0.02) * 100
|
||||||
|
|
||||||
|
fig = go.Figure()
|
||||||
|
fig.add_trace(go.Scatter(
|
||||||
|
x=dates,
|
||||||
|
y=performance,
|
||||||
|
mode='lines+markers',
|
||||||
|
name='Portfolio Performance',
|
||||||
|
line=dict(color='#27ae60', width=3),
|
||||||
|
marker=dict(size=6)
|
||||||
|
))
|
||||||
|
|
||||||
|
fig.update_layout(
|
||||||
|
title='Portfolio Performance (7 Days)',
|
||||||
|
xaxis_title='Date',
|
||||||
|
yaxis_title='Performance (%)',
|
||||||
|
template='plotly_white',
|
||||||
|
height=400
|
||||||
|
)
|
||||||
|
|
||||||
|
return fig
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error creating dashboard: {e}")
|
||||||
|
import traceback
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
return None
|
||||||
|
|
||||||
|
def test_data_provider():
|
||||||
|
"""Test data provider in background"""
|
||||||
|
try:
|
||||||
|
from core.data_provider import DataProvider
|
||||||
|
from core.api_rate_limiter import get_rate_limiter
|
||||||
|
|
||||||
|
logger.info("Testing data provider...")
|
||||||
|
|
||||||
|
# Create data provider
|
||||||
|
data_provider = DataProvider(
|
||||||
|
symbols=['ETH/USDT'],
|
||||||
|
timeframes=['1m', '5m']
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test getting data
|
||||||
|
df = data_provider.get_historical_data('ETH/USDT', '1m', limit=10)
|
||||||
|
if df is not None and len(df) > 0:
|
||||||
|
logger.info(f"✓ Data provider working: {len(df)} candles retrieved")
|
||||||
|
else:
|
||||||
|
logger.warning("⚠ Data provider returned no data (rate limiting)")
|
||||||
|
|
||||||
|
# Test rate limiter status
|
||||||
|
rate_limiter = get_rate_limiter()
|
||||||
|
status = rate_limiter.get_all_endpoint_status()
|
||||||
|
logger.info(f"Rate limiter status: {status}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Data provider test error: {e}")
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main function"""
|
||||||
|
logger.info("=" * 60)
|
||||||
|
logger.info("SIMPLE DASHBOARD RUNNER - TESTING SYSTEM")
|
||||||
|
logger.info("=" * 60)
|
||||||
|
|
||||||
|
# Test data provider in background
|
||||||
|
data_thread = threading.Thread(target=test_data_provider, daemon=True)
|
||||||
|
data_thread.start()
|
||||||
|
|
||||||
|
# Create and run dashboard
|
||||||
|
app = create_simple_dashboard()
|
||||||
|
if app is None:
|
||||||
|
logger.error("Failed to create dashboard")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
logger.info("Starting dashboard server...")
|
||||||
|
logger.info("Dashboard URL: http://127.0.0.1:8050")
|
||||||
|
logger.info("Press Ctrl+C to stop")
|
||||||
|
|
||||||
|
# Run the dashboard
|
||||||
|
app.run(debug=False, host='127.0.0.1', port=8050, use_reloader=False)
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Dashboard stopped by user")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Dashboard error: {e}")
|
||||||
|
import traceback
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
294
test_profitability_reward_system.py
Normal file
294
test_profitability_reward_system.py
Normal file
@ -0,0 +1,294 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test script for the dynamic profitability reward system
|
||||||
|
|
||||||
|
This script tests:
|
||||||
|
1. Fee reversion to normal 0.1% (0.001)
|
||||||
|
2. Dynamic profitability reward multiplier adjustment
|
||||||
|
3. Success rate calculation
|
||||||
|
4. Integration with dashboard display
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
# Add project root to path
|
||||||
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
|
||||||
|
from core.trading_executor import TradingExecutor, TradeRecord
|
||||||
|
from core.orchestrator import TradingOrchestrator
|
||||||
|
from core.data_provider import DataProvider
|
||||||
|
|
||||||
|
def test_fee_configuration():
|
||||||
|
"""Test that fees are reverted to normal 0.1%"""
|
||||||
|
print("=" * 60)
|
||||||
|
print("🧪 TESTING FEE CONFIGURATION")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
executor = TradingExecutor()
|
||||||
|
|
||||||
|
# Check fee configuration
|
||||||
|
expected_open_fee = 0.001 # 0.1%
|
||||||
|
expected_close_fee = 0.001 # 0.1%
|
||||||
|
expected_total_fee = 0.002 # 0.2%
|
||||||
|
|
||||||
|
actual_open_fee = executor.trading_fees['open_fee_percent']
|
||||||
|
actual_close_fee = executor.trading_fees['close_fee_percent']
|
||||||
|
actual_total_fee = executor.trading_fees['total_round_trip_fee']
|
||||||
|
|
||||||
|
print(f"Expected Open Fee: {expected_open_fee} (0.1%)")
|
||||||
|
print(f"Actual Open Fee: {actual_open_fee} (0.1%)")
|
||||||
|
print(f"✅ Open Fee: {'PASS' if actual_open_fee == expected_open_fee else 'FAIL'}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print(f"Expected Close Fee: {expected_close_fee} (0.1%)")
|
||||||
|
print(f"Actual Close Fee: {actual_close_fee} (0.1%)")
|
||||||
|
print(f"✅ Close Fee: {'PASS' if actual_close_fee == expected_close_fee else 'FAIL'}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print(f"Expected Total Fee: {expected_total_fee} (0.2%)")
|
||||||
|
print(f"Actual Total Fee: {actual_total_fee} (0.2%)")
|
||||||
|
print(f"✅ Total Fee: {'PASS' if actual_total_fee == expected_total_fee else 'FAIL'}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
return actual_open_fee == expected_open_fee and actual_close_fee == expected_close_fee
|
||||||
|
|
||||||
|
def test_profitability_multiplier_initialization():
|
||||||
|
"""Test profitability multiplier initialization"""
|
||||||
|
print("=" * 60)
|
||||||
|
print("🧪 TESTING PROFITABILITY MULTIPLIER INITIALIZATION")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
executor = TradingExecutor()
|
||||||
|
|
||||||
|
# Check initial values
|
||||||
|
initial_multiplier = executor.profitability_reward_multiplier
|
||||||
|
min_multiplier = executor.min_profitability_multiplier
|
||||||
|
max_multiplier = executor.max_profitability_multiplier
|
||||||
|
adjustment_step = executor.profitability_adjustment_step
|
||||||
|
|
||||||
|
print(f"Initial Multiplier: {initial_multiplier} (should be 0.0)")
|
||||||
|
print(f"Min Multiplier: {min_multiplier} (should be 0.0)")
|
||||||
|
print(f"Max Multiplier: {max_multiplier} (should be 2.0)")
|
||||||
|
print(f"Adjustment Step: {adjustment_step} (should be 0.1)")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Check thresholds
|
||||||
|
increase_threshold = executor.success_rate_increase_threshold
|
||||||
|
decrease_threshold = executor.success_rate_decrease_threshold
|
||||||
|
trades_window = executor.recent_trades_window
|
||||||
|
|
||||||
|
print(f"Increase Threshold: {increase_threshold:.1%} (should be 60%)")
|
||||||
|
print(f"Decrease Threshold: {decrease_threshold:.1%} (should be 51%)")
|
||||||
|
print(f"Trades Window: {trades_window} (should be 20)")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Test getter method
|
||||||
|
multiplier_from_getter = executor.get_profitability_reward_multiplier()
|
||||||
|
print(f"Multiplier via getter: {multiplier_from_getter}")
|
||||||
|
print(f"✅ Getter method: {'PASS' if multiplier_from_getter == initial_multiplier else 'FAIL'}")
|
||||||
|
|
||||||
|
return (initial_multiplier == 0.0 and
|
||||||
|
min_multiplier == 0.0 and
|
||||||
|
max_multiplier == 2.0 and
|
||||||
|
adjustment_step == 0.1)
|
||||||
|
|
||||||
|
def simulate_trades_and_test_adjustment(executor, winning_trades, total_trades):
|
||||||
|
"""Simulate trades and test multiplier adjustment"""
|
||||||
|
print(f"📊 Simulating {winning_trades}/{total_trades} winning trades ({winning_trades/total_trades:.1%} success rate)")
|
||||||
|
|
||||||
|
# Clear existing trade records
|
||||||
|
executor.trade_records = []
|
||||||
|
|
||||||
|
# Create simulated trade records
|
||||||
|
base_time = datetime.now() - timedelta(hours=1)
|
||||||
|
|
||||||
|
for i in range(total_trades):
|
||||||
|
# Create winning or losing trade based on ratio
|
||||||
|
is_winning = i < winning_trades
|
||||||
|
pnl = 10.0 if is_winning else -5.0 # $10 profit or $5 loss
|
||||||
|
|
||||||
|
trade_record = TradeRecord(
|
||||||
|
symbol="ETH/USDT",
|
||||||
|
side="LONG",
|
||||||
|
quantity=0.01,
|
||||||
|
entry_price=3000.0,
|
||||||
|
exit_price=3010.0 if is_winning else 2995.0,
|
||||||
|
entry_time=base_time + timedelta(minutes=i*2),
|
||||||
|
exit_time=base_time + timedelta(minutes=i*2+1),
|
||||||
|
pnl=pnl,
|
||||||
|
fees=2.0,
|
||||||
|
confidence=0.8,
|
||||||
|
net_pnl=pnl - 2.0 # After fees
|
||||||
|
)
|
||||||
|
|
||||||
|
executor.trade_records.append(trade_record)
|
||||||
|
|
||||||
|
# Force adjustment by setting last adjustment time to past
|
||||||
|
executor.last_profitability_adjustment = datetime.now() - timedelta(minutes=10)
|
||||||
|
|
||||||
|
# Get initial multiplier
|
||||||
|
initial_multiplier = executor.get_profitability_reward_multiplier()
|
||||||
|
|
||||||
|
# Calculate success rate
|
||||||
|
success_rate = executor._calculate_recent_success_rate()
|
||||||
|
print(f"Calculated success rate: {success_rate:.1%}")
|
||||||
|
|
||||||
|
# Trigger adjustment
|
||||||
|
executor._adjust_profitability_reward_multiplier()
|
||||||
|
|
||||||
|
# Get new multiplier
|
||||||
|
new_multiplier = executor.get_profitability_reward_multiplier()
|
||||||
|
|
||||||
|
print(f"Initial multiplier: {initial_multiplier:.1f}")
|
||||||
|
print(f"New multiplier: {new_multiplier:.1f}")
|
||||||
|
|
||||||
|
# Determine expected change
|
||||||
|
if success_rate > executor.success_rate_increase_threshold:
|
||||||
|
expected_change = "increase"
|
||||||
|
expected_new = min(executor.max_profitability_multiplier, initial_multiplier + executor.profitability_adjustment_step)
|
||||||
|
elif success_rate < executor.success_rate_decrease_threshold:
|
||||||
|
expected_change = "decrease"
|
||||||
|
expected_new = max(executor.min_profitability_multiplier, initial_multiplier - executor.profitability_adjustment_step)
|
||||||
|
else:
|
||||||
|
expected_change = "no change"
|
||||||
|
expected_new = initial_multiplier
|
||||||
|
|
||||||
|
print(f"Expected change: {expected_change}")
|
||||||
|
print(f"Expected new value: {expected_new:.1f}")
|
||||||
|
|
||||||
|
success = abs(new_multiplier - expected_new) < 0.01
|
||||||
|
print(f"✅ Adjustment: {'PASS' if success else 'FAIL'}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
return success
|
||||||
|
|
||||||
|
def test_orchestrator_integration():
|
||||||
|
"""Test orchestrator integration with profitability multiplier"""
|
||||||
|
print("=" * 60)
|
||||||
|
print("🧪 TESTING ORCHESTRATOR INTEGRATION")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
# Create components
|
||||||
|
data_provider = DataProvider()
|
||||||
|
executor = TradingExecutor()
|
||||||
|
orchestrator = TradingOrchestrator(data_provider=data_provider)
|
||||||
|
|
||||||
|
# Connect executor to orchestrator
|
||||||
|
orchestrator.set_trading_executor(executor)
|
||||||
|
|
||||||
|
# Set a test multiplier
|
||||||
|
executor.profitability_reward_multiplier = 1.5
|
||||||
|
|
||||||
|
# Test getting multiplier through orchestrator
|
||||||
|
multiplier = orchestrator.get_profitability_reward_multiplier()
|
||||||
|
print(f"Multiplier via orchestrator: {multiplier}")
|
||||||
|
print(f"✅ Orchestrator getter: {'PASS' if multiplier == 1.5 else 'FAIL'}")
|
||||||
|
|
||||||
|
# Test enhanced reward calculation
|
||||||
|
base_pnl = 100.0 # $100 profit
|
||||||
|
confidence = 0.8
|
||||||
|
|
||||||
|
enhanced_reward = orchestrator.calculate_enhanced_reward(base_pnl, confidence)
|
||||||
|
expected_enhanced = base_pnl * (1.0 + 1.5) # 100 * 2.5 = 250
|
||||||
|
|
||||||
|
print(f"Base P&L: ${base_pnl:.2f}")
|
||||||
|
print(f"Enhanced reward: ${enhanced_reward:.2f}")
|
||||||
|
print(f"Expected: ${expected_enhanced:.2f}")
|
||||||
|
print(f"✅ Enhanced reward: {'PASS' if abs(enhanced_reward - expected_enhanced) < 0.01 else 'FAIL'}")
|
||||||
|
|
||||||
|
# Test with losing trade (should not be enhanced)
|
||||||
|
losing_pnl = -50.0
|
||||||
|
enhanced_losing = orchestrator.calculate_enhanced_reward(losing_pnl, confidence)
|
||||||
|
print(f"Losing P&L: ${losing_pnl:.2f}")
|
||||||
|
print(f"Enhanced losing: ${enhanced_losing:.2f}")
|
||||||
|
print(f"✅ No enhancement for losses: {'PASS' if enhanced_losing == losing_pnl else 'FAIL'}")
|
||||||
|
|
||||||
|
return multiplier == 1.5 and abs(enhanced_reward - expected_enhanced) < 0.01
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Run all tests"""
|
||||||
|
print("🚀 DYNAMIC PROFITABILITY REWARD SYSTEM TEST")
|
||||||
|
print("Testing fee reversion and dynamic reward adjustment")
|
||||||
|
print()
|
||||||
|
|
||||||
|
all_tests_passed = True
|
||||||
|
|
||||||
|
# Test 1: Fee configuration
|
||||||
|
try:
|
||||||
|
fee_test_passed = test_fee_configuration()
|
||||||
|
all_tests_passed = all_tests_passed and fee_test_passed
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Fee configuration test failed: {e}")
|
||||||
|
all_tests_passed = False
|
||||||
|
|
||||||
|
# Test 2: Profitability multiplier initialization
|
||||||
|
try:
|
||||||
|
init_test_passed = test_profitability_multiplier_initialization()
|
||||||
|
all_tests_passed = all_tests_passed and init_test_passed
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Initialization test failed: {e}")
|
||||||
|
all_tests_passed = False
|
||||||
|
|
||||||
|
# Test 3: Multiplier adjustment scenarios
|
||||||
|
print("=" * 60)
|
||||||
|
print("🧪 TESTING MULTIPLIER ADJUSTMENT SCENARIOS")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
executor = TradingExecutor()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Scenario 1: High success rate (should increase multiplier)
|
||||||
|
print("Scenario 1: High success rate (65% - should increase)")
|
||||||
|
high_success_test = simulate_trades_and_test_adjustment(executor, 13, 20) # 65%
|
||||||
|
all_tests_passed = all_tests_passed and high_success_test
|
||||||
|
|
||||||
|
# Scenario 2: Low success rate (should decrease multiplier)
|
||||||
|
print("Scenario 2: Low success rate (45% - should decrease)")
|
||||||
|
low_success_test = simulate_trades_and_test_adjustment(executor, 9, 20) # 45%
|
||||||
|
all_tests_passed = all_tests_passed and low_success_test
|
||||||
|
|
||||||
|
# Scenario 3: Medium success rate (should not change)
|
||||||
|
print("Scenario 3: Medium success rate (55% - should not change)")
|
||||||
|
medium_success_test = simulate_trades_and_test_adjustment(executor, 11, 20) # 55%
|
||||||
|
all_tests_passed = all_tests_passed and medium_success_test
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Adjustment scenario tests failed: {e}")
|
||||||
|
all_tests_passed = False
|
||||||
|
|
||||||
|
# Test 4: Orchestrator integration
|
||||||
|
try:
|
||||||
|
orchestrator_test_passed = test_orchestrator_integration()
|
||||||
|
all_tests_passed = all_tests_passed and orchestrator_test_passed
|
||||||
|
except Exception as e:
|
||||||
|
print(f"❌ Orchestrator integration test failed: {e}")
|
||||||
|
all_tests_passed = False
|
||||||
|
|
||||||
|
# Final results
|
||||||
|
print("=" * 60)
|
||||||
|
print("📋 TEST RESULTS SUMMARY")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
if all_tests_passed:
|
||||||
|
print("🎉 ALL TESTS PASSED!")
|
||||||
|
print("✅ Fees reverted to normal 0.1%")
|
||||||
|
print("✅ Dynamic profitability multiplier working")
|
||||||
|
print("✅ Success rate calculation accurate")
|
||||||
|
print("✅ Orchestrator integration functional")
|
||||||
|
print()
|
||||||
|
print("🚀 System ready for trading with dynamic profitability rewards!")
|
||||||
|
print("📈 The model will learn to prioritize more profitable trades over time")
|
||||||
|
print("🎯 Success rate >60% → increase reward multiplier")
|
||||||
|
print("⚠️ Success rate <51% → decrease reward multiplier")
|
||||||
|
else:
|
||||||
|
print("❌ SOME TESTS FAILED!")
|
||||||
|
print("Please check the error messages above and fix issues before trading.")
|
||||||
|
|
||||||
|
return all_tests_passed
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
success = main()
|
||||||
|
sys.exit(0 if success else 1)
|
@ -496,6 +496,7 @@ class CleanTradingDashboard:
|
|||||||
Output('current-position', 'children'),
|
Output('current-position', 'children'),
|
||||||
Output('trade-count', 'children'),
|
Output('trade-count', 'children'),
|
||||||
Output('portfolio-value', 'children'),
|
Output('portfolio-value', 'children'),
|
||||||
|
Output('profitability-multiplier', 'children'),
|
||||||
Output('mexc-status', 'children')],
|
Output('mexc-status', 'children')],
|
||||||
[Input('interval-component', 'n_intervals')]
|
[Input('interval-component', 'n_intervals')]
|
||||||
)
|
)
|
||||||
@ -600,6 +601,20 @@ class CleanTradingDashboard:
|
|||||||
portfolio_value = current_balance + total_session_pnl # Live balance + unrealized P&L
|
portfolio_value = current_balance + total_session_pnl # Live balance + unrealized P&L
|
||||||
portfolio_str = f"${portfolio_value:.2f}"
|
portfolio_str = f"${portfolio_value:.2f}"
|
||||||
|
|
||||||
|
# Profitability multiplier - get from trading executor
|
||||||
|
profitability_multiplier = 0.0
|
||||||
|
success_rate = 0.0
|
||||||
|
if self.trading_executor and hasattr(self.trading_executor, 'get_profitability_reward_multiplier'):
|
||||||
|
profitability_multiplier = self.trading_executor.get_profitability_reward_multiplier()
|
||||||
|
if hasattr(self.trading_executor, '_calculate_recent_success_rate'):
|
||||||
|
success_rate = self.trading_executor._calculate_recent_success_rate()
|
||||||
|
|
||||||
|
# Format profitability multiplier display
|
||||||
|
if profitability_multiplier > 0:
|
||||||
|
multiplier_str = f"+{profitability_multiplier:.1f}x ({success_rate:.0%})"
|
||||||
|
else:
|
||||||
|
multiplier_str = f"0.0x ({success_rate:.0%})" if success_rate > 0 else "0.0x"
|
||||||
|
|
||||||
# MEXC status - enhanced with sync status
|
# MEXC status - enhanced with sync status
|
||||||
mexc_status = "SIM"
|
mexc_status = "SIM"
|
||||||
if self.trading_executor:
|
if self.trading_executor:
|
||||||
@ -607,11 +622,11 @@ class CleanTradingDashboard:
|
|||||||
if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode:
|
if hasattr(self.trading_executor, 'simulation_mode') and not self.trading_executor.simulation_mode:
|
||||||
mexc_status = "LIVE+SYNC" # Indicate live trading with position sync
|
mexc_status = "LIVE+SYNC" # Indicate live trading with position sync
|
||||||
|
|
||||||
return price_str, session_pnl_str, position_str, trade_str, portfolio_str, mexc_status
|
return price_str, session_pnl_str, position_str, trade_str, portfolio_str, multiplier_str, mexc_status
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error updating metrics: {e}")
|
logger.error(f"Error updating metrics: {e}")
|
||||||
return "Error", "$0.00", "Error", "0", "$100.00", "ERROR"
|
return "Error", "$0.00", "Error", "0", "$100.00", "0.0x", "ERROR"
|
||||||
|
|
||||||
@self.app.callback(
|
@self.app.callback(
|
||||||
Output('recent-decisions', 'children'),
|
Output('recent-decisions', 'children'),
|
||||||
@ -2311,49 +2326,56 @@ class CleanTradingDashboard:
|
|||||||
cob_data = self.data_provider.get_latest_cob_data(symbol)
|
cob_data = self.data_provider.get_latest_cob_data(symbol)
|
||||||
logger.debug(f"COB data type for {symbol}: {type(cob_data)}, data: {cob_data}")
|
logger.debug(f"COB data type for {symbol}: {type(cob_data)}, data: {cob_data}")
|
||||||
|
|
||||||
if cob_data and isinstance(cob_data, dict) and 'stats' in cob_data:
|
if cob_data and isinstance(cob_data, dict):
|
||||||
logger.debug(f"COB snapshot available for {symbol} from centralized data provider")
|
# Validate COB data structure
|
||||||
|
if 'stats' in cob_data and cob_data['stats']:
|
||||||
# Create a snapshot object from the data provider's data
|
logger.debug(f"COB snapshot available for {symbol} from centralized data provider")
|
||||||
class COBSnapshot:
|
|
||||||
def __init__(self, data):
|
# Create a snapshot object from the data provider's data
|
||||||
# Convert list format [[price, qty], ...] to dictionary format
|
class COBSnapshot:
|
||||||
raw_bids = data.get('bids', [])
|
def __init__(self, data):
|
||||||
raw_asks = data.get('asks', [])
|
# Convert list format [[price, qty], ...] to dictionary format
|
||||||
|
raw_bids = data.get('bids', [])
|
||||||
# Convert to dictionary format expected by component manager
|
raw_asks = data.get('asks', [])
|
||||||
self.consolidated_bids = []
|
|
||||||
for bid in raw_bids:
|
# Convert to dictionary format expected by component manager
|
||||||
if isinstance(bid, list) and len(bid) >= 2:
|
self.consolidated_bids = []
|
||||||
self.consolidated_bids.append({
|
for bid in raw_bids:
|
||||||
'price': bid[0],
|
if isinstance(bid, list) and len(bid) >= 2:
|
||||||
'size': bid[1],
|
self.consolidated_bids.append({
|
||||||
'total_size': bid[1],
|
'price': bid[0],
|
||||||
'total_volume_usd': bid[0] * bid[1]
|
'size': bid[1],
|
||||||
})
|
'total_size': bid[1],
|
||||||
|
'total_volume_usd': bid[0] * bid[1]
|
||||||
self.consolidated_asks = []
|
})
|
||||||
for ask in raw_asks:
|
|
||||||
if isinstance(ask, list) and len(ask) >= 2:
|
self.consolidated_asks = []
|
||||||
self.consolidated_asks.append({
|
for ask in raw_asks:
|
||||||
'price': ask[0],
|
if isinstance(ask, list) and len(ask) >= 2:
|
||||||
'size': ask[1],
|
self.consolidated_asks.append({
|
||||||
'total_size': ask[1],
|
'price': ask[0],
|
||||||
'total_volume_usd': ask[0] * ask[1]
|
'size': ask[1],
|
||||||
})
|
'total_size': ask[1],
|
||||||
|
'total_volume_usd': ask[0] * ask[1]
|
||||||
self.stats = data.get('stats', {})
|
})
|
||||||
# Add direct attributes for new format compatibility
|
|
||||||
self.volume_weighted_mid = self.stats.get('mid_price', 0)
|
self.stats = data.get('stats', {})
|
||||||
self.spread_bps = self.stats.get('spread_bps', 0)
|
# Add direct attributes for new format compatibility
|
||||||
self.liquidity_imbalance = self.stats.get('imbalance', 0)
|
self.volume_weighted_mid = self.stats.get('mid_price', 0)
|
||||||
self.total_bid_liquidity = self.stats.get('bid_liquidity', 0)
|
self.spread_bps = self.stats.get('spread_bps', 0)
|
||||||
self.total_ask_liquidity = self.stats.get('ask_liquidity', 0)
|
self.liquidity_imbalance = self.stats.get('imbalance', 0)
|
||||||
self.exchanges_active = ['Binance'] # Default for now
|
self.total_bid_liquidity = self.stats.get('bid_liquidity', 0)
|
||||||
|
self.total_ask_liquidity = self.stats.get('ask_liquidity', 0)
|
||||||
return COBSnapshot(cob_data)
|
self.exchanges_active = ['Binance'] # Default for now
|
||||||
|
|
||||||
|
return COBSnapshot(cob_data)
|
||||||
|
else:
|
||||||
|
# Data exists but no stats - this is the "Invalid COB data" case
|
||||||
|
logger.debug(f"COB data for {symbol} missing stats structure: {type(cob_data)}, keys: {list(cob_data.keys()) if isinstance(cob_data, dict) else 'not dict'}")
|
||||||
|
return None
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Invalid COB data for {symbol}: type={type(cob_data)}, has_stats={'stats' in cob_data if isinstance(cob_data, dict) else False}")
|
logger.debug(f"No COB data available for {symbol} from data provider")
|
||||||
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error getting COB data from data provider: {e}")
|
logger.error(f"Error getting COB data from data provider: {e}")
|
||||||
|
|
||||||
@ -5358,6 +5380,18 @@ class CleanTradingDashboard:
|
|||||||
|
|
||||||
self.latest_cob_data[symbol] = cob_snapshot
|
self.latest_cob_data[symbol] = cob_snapshot
|
||||||
|
|
||||||
|
# Store in history for moving average calculations
|
||||||
|
if not hasattr(self, 'cob_data_history'):
|
||||||
|
self.cob_data_history = {'ETH/USDT': deque(maxlen=61), 'BTC/USDT': deque(maxlen=61)}
|
||||||
|
|
||||||
|
if symbol in self.cob_data_history:
|
||||||
|
self.cob_data_history[symbol].append(cob_snapshot)
|
||||||
|
|
||||||
|
# Update last update timestamp
|
||||||
|
if not hasattr(self, 'cob_last_update'):
|
||||||
|
self.cob_last_update = {}
|
||||||
|
self.cob_last_update[symbol] = time.time()
|
||||||
|
|
||||||
# Update current price from COB data
|
# Update current price from COB data
|
||||||
if 'stats' in cob_snapshot and 'mid_price' in cob_snapshot['stats']:
|
if 'stats' in cob_snapshot and 'mid_price' in cob_snapshot['stats']:
|
||||||
self.current_prices[symbol] = cob_snapshot['stats']['mid_price']
|
self.current_prices[symbol] = cob_snapshot['stats']['mid_price']
|
||||||
@ -6021,33 +6055,71 @@ class CleanTradingDashboard:
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
|
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
|
||||||
"""Calculate average imbalance over multiple time windows."""
|
"""Calculate Moving Averages (MA) of imbalance over different periods."""
|
||||||
stats = {}
|
stats = {}
|
||||||
now = time.time()
|
|
||||||
history = self.cob_data_history.get(symbol)
|
history = self.cob_data_history.get(symbol)
|
||||||
|
|
||||||
if not history:
|
if not history:
|
||||||
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||||
|
|
||||||
periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
|
# Convert history to list and get recent snapshots
|
||||||
|
history_list = list(history)
|
||||||
for name, duration in periods.items():
|
if not history_list:
|
||||||
recent_imbalances = []
|
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||||
for snap in history:
|
|
||||||
# Check if snap is a valid dict with timestamp and stats
|
|
||||||
if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']:
|
|
||||||
imbalance = snap['stats'].get('imbalance')
|
|
||||||
if imbalance is not None:
|
|
||||||
recent_imbalances.append(imbalance)
|
|
||||||
|
|
||||||
if recent_imbalances:
|
# Extract imbalance values from recent snapshots
|
||||||
stats[name] = sum(recent_imbalances) / len(recent_imbalances)
|
imbalances = []
|
||||||
else:
|
for snap in history_list:
|
||||||
stats[name] = 0.0
|
if isinstance(snap, dict) and 'stats' in snap and snap['stats']:
|
||||||
|
imbalance = snap['stats'].get('imbalance')
|
||||||
|
if imbalance is not None:
|
||||||
|
imbalances.append(imbalance)
|
||||||
|
|
||||||
|
if not imbalances:
|
||||||
|
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||||
|
|
||||||
|
# Calculate Moving Averages over different periods
|
||||||
|
# MA periods: 1s=1 period, 5s=5 periods, 15s=15 periods, 60s=60 periods
|
||||||
|
ma_periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
|
||||||
|
|
||||||
# Debug logging to verify cumulative imbalance calculation
|
for name, period in ma_periods.items():
|
||||||
|
if len(imbalances) >= period:
|
||||||
|
# Calculate SMA over the last 'period' values
|
||||||
|
recent_imbalances = imbalances[-period:]
|
||||||
|
sma_value = sum(recent_imbalances) / len(recent_imbalances)
|
||||||
|
|
||||||
|
# Also calculate EMA for better responsiveness
|
||||||
|
if period > 1:
|
||||||
|
# EMA calculation with alpha = 2/(period+1)
|
||||||
|
alpha = 2.0 / (period + 1)
|
||||||
|
ema_value = recent_imbalances[0] # Start with first value
|
||||||
|
for value in recent_imbalances[1:]:
|
||||||
|
ema_value = alpha * value + (1 - alpha) * ema_value
|
||||||
|
# Use EMA for better responsiveness
|
||||||
|
stats[name] = ema_value
|
||||||
|
else:
|
||||||
|
# For 1s, use SMA (no EMA needed)
|
||||||
|
stats[name] = sma_value
|
||||||
|
else:
|
||||||
|
# If not enough data, use available data
|
||||||
|
available_imbalances = imbalances[-min(period, len(imbalances)):]
|
||||||
|
if available_imbalances:
|
||||||
|
if len(available_imbalances) > 1:
|
||||||
|
# Calculate EMA for available data
|
||||||
|
alpha = 2.0 / (len(available_imbalances) + 1)
|
||||||
|
ema_value = available_imbalances[0]
|
||||||
|
for value in available_imbalances[1:]:
|
||||||
|
ema_value = alpha * value + (1 - alpha) * ema_value
|
||||||
|
stats[name] = ema_value
|
||||||
|
else:
|
||||||
|
# Single value, use as is
|
||||||
|
stats[name] = available_imbalances[0]
|
||||||
|
else:
|
||||||
|
stats[name] = 0.0
|
||||||
|
|
||||||
|
# Debug logging to verify MA calculation
|
||||||
if any(value != 0.0 for value in stats.values()):
|
if any(value != 0.0 for value in stats.values()):
|
||||||
logger.debug(f"[CUMULATIVE-IMBALANCE] {symbol}: {stats}")
|
logger.debug(f"[MOVING-AVERAGE-IMBALANCE] {symbol}: {stats} (from {len(imbalances)} snapshots)")
|
||||||
|
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
|
@ -412,10 +412,10 @@ class DashboardComponentManager:
|
|||||||
]),
|
]),
|
||||||
|
|
||||||
html.Div([
|
html.Div([
|
||||||
self._create_timeframe_imbalance("1s", stats.get('imbalance_1s', imbalance)),
|
self._create_timeframe_imbalance("1s", cumulative_imbalance_stats.get('1s', imbalance)),
|
||||||
self._create_timeframe_imbalance("5s", stats.get('imbalance_5s', imbalance)),
|
self._create_timeframe_imbalance("5s", cumulative_imbalance_stats.get('5s', imbalance)),
|
||||||
self._create_timeframe_imbalance("15s", stats.get('imbalance_15s', imbalance)),
|
self._create_timeframe_imbalance("15s", cumulative_imbalance_stats.get('15s', imbalance)),
|
||||||
self._create_timeframe_imbalance("60s", stats.get('imbalance_60s', imbalance)),
|
self._create_timeframe_imbalance("60s", cumulative_imbalance_stats.get('60s', imbalance)),
|
||||||
], className="d-flex justify-content-between mb-2"),
|
], className="d-flex justify-content-between mb-2"),
|
||||||
|
|
||||||
html.Div(imbalance_stats_display),
|
html.Div(imbalance_stats_display),
|
||||||
|
@ -93,6 +93,7 @@ class DashboardLayoutManager:
|
|||||||
# ("leverage-info", "Leverage", "text-primary"),
|
# ("leverage-info", "Leverage", "text-primary"),
|
||||||
("trade-count", "Trades", "text-warning"),
|
("trade-count", "Trades", "text-warning"),
|
||||||
("portfolio-value", "Portfolio", "text-secondary"),
|
("portfolio-value", "Portfolio", "text-secondary"),
|
||||||
|
("profitability-multiplier", "Profit Boost", "text-primary"),
|
||||||
("mexc-status", f"{exchange_name} API", "text-info")
|
("mexc-status", f"{exchange_name} API", "text-info")
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -986,33 +986,71 @@ class TemplatedTradingDashboard:
|
|||||||
logger.debug(f"TEMPLATED DASHBOARD: Error generating bucketed COB data: {e}")
|
logger.debug(f"TEMPLATED DASHBOARD: Error generating bucketed COB data: {e}")
|
||||||
|
|
||||||
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
|
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
|
||||||
"""Calculate average imbalance over multiple time windows."""
|
"""Calculate Moving Averages (MA) of imbalance over different periods."""
|
||||||
stats = {}
|
stats = {}
|
||||||
now = time.time()
|
|
||||||
history = self.cob_data_history.get(symbol)
|
history = self.cob_data_history.get(symbol)
|
||||||
|
|
||||||
if not history:
|
if not history:
|
||||||
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||||
|
|
||||||
periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
|
# Convert history to list and get recent snapshots
|
||||||
|
history_list = list(history)
|
||||||
for name, duration in periods.items():
|
if not history_list:
|
||||||
recent_imbalances = []
|
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||||
for snap in history:
|
|
||||||
# Check if snap is a valid dict with timestamp and stats
|
|
||||||
if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']:
|
|
||||||
imbalance = snap['stats'].get('imbalance')
|
|
||||||
if imbalance is not None:
|
|
||||||
recent_imbalances.append(imbalance)
|
|
||||||
|
|
||||||
if recent_imbalances:
|
# Extract imbalance values from recent snapshots
|
||||||
stats[name] = sum(recent_imbalances) / len(recent_imbalances)
|
imbalances = []
|
||||||
else:
|
for snap in history_list:
|
||||||
stats[name] = 0.0
|
if isinstance(snap, dict) and 'stats' in snap and snap['stats']:
|
||||||
|
imbalance = snap['stats'].get('imbalance')
|
||||||
|
if imbalance is not None:
|
||||||
|
imbalances.append(imbalance)
|
||||||
|
|
||||||
|
if not imbalances:
|
||||||
|
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
|
||||||
|
|
||||||
|
# Calculate Moving Averages over different periods
|
||||||
|
# MA periods: 1s=1 period, 5s=5 periods, 15s=15 periods, 60s=60 periods
|
||||||
|
ma_periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
|
||||||
|
|
||||||
# Debug logging to verify cumulative imbalance calculation
|
for name, period in ma_periods.items():
|
||||||
|
if len(imbalances) >= period:
|
||||||
|
# Calculate SMA over the last 'period' values
|
||||||
|
recent_imbalances = imbalances[-period:]
|
||||||
|
sma_value = sum(recent_imbalances) / len(recent_imbalances)
|
||||||
|
|
||||||
|
# Also calculate EMA for better responsiveness
|
||||||
|
if period > 1:
|
||||||
|
# EMA calculation with alpha = 2/(period+1)
|
||||||
|
alpha = 2.0 / (period + 1)
|
||||||
|
ema_value = recent_imbalances[0] # Start with first value
|
||||||
|
for value in recent_imbalances[1:]:
|
||||||
|
ema_value = alpha * value + (1 - alpha) * ema_value
|
||||||
|
# Use EMA for better responsiveness
|
||||||
|
stats[name] = ema_value
|
||||||
|
else:
|
||||||
|
# For 1s, use SMA (no EMA needed)
|
||||||
|
stats[name] = sma_value
|
||||||
|
else:
|
||||||
|
# If not enough data, use available data
|
||||||
|
available_imbalances = imbalances[-min(period, len(imbalances)):]
|
||||||
|
if available_imbalances:
|
||||||
|
if len(available_imbalances) > 1:
|
||||||
|
# Calculate EMA for available data
|
||||||
|
alpha = 2.0 / (len(available_imbalances) + 1)
|
||||||
|
ema_value = available_imbalances[0]
|
||||||
|
for value in available_imbalances[1:]:
|
||||||
|
ema_value = alpha * value + (1 - alpha) * ema_value
|
||||||
|
stats[name] = ema_value
|
||||||
|
else:
|
||||||
|
# Single value, use as is
|
||||||
|
stats[name] = available_imbalances[0]
|
||||||
|
else:
|
||||||
|
stats[name] = 0.0
|
||||||
|
|
||||||
|
# Debug logging to verify MA calculation
|
||||||
if any(value != 0.0 for value in stats.values()):
|
if any(value != 0.0 for value in stats.values()):
|
||||||
logger.debug(f"TEMPLATED DASHBOARD: [CUMULATIVE-IMBALANCE] {symbol}: {stats}")
|
logger.debug(f"TEMPLATED DASHBOARD: [MOVING-AVERAGE-IMBALANCE] {symbol}: {stats} (from {len(imbalances)} snapshots)")
|
||||||
|
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user