cleanup enhanced orchestrator

This commit is contained in:
Dobromir Popov
2025-06-25 15:57:05 +03:00
parent 8a51fcb70a
commit c6094160d7

View File

@ -58,8 +58,6 @@ from core.trading_executor import TradingExecutor
from web.layout_manager import DashboardLayoutManager
from web.component_manager import DashboardComponentManager
# Enhanced RL components are no longer available - using Basic orchestrator only
ENHANCED_RL_AVAILABLE = False
try:
from core.cob_integration import COBIntegration
@ -143,6 +141,7 @@ class CleanTradingDashboard:
'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}
}
self.latest_cob_data = {} # Cache for COB integration data
self.cob_predictions = {} # Cache for COB predictions (both ETH and BTC for display)
# Initialize timezone
timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia')
@ -603,7 +602,7 @@ class CleanTradingDashboard:
executed_signals = [signal for signal in self.recent_decisions if signal.get('executed', False)]
if executed_signals:
# Separate by prediction type
# Separate by prediction type
buy_trades = []
sell_trades = []
@ -637,51 +636,51 @@ class CleanTradingDashboard:
# Add EXECUTED BUY trades (large green circles)
if buy_trades:
fig.add_trace(
go.Scatter(
fig.add_trace(
go.Scatter(
x=[t['x'] for t in buy_trades],
y=[t['y'] for t in buy_trades],
mode='markers',
marker=dict(
mode='markers',
marker=dict(
symbol='circle',
size=15,
color='rgba(0, 255, 100, 0.9)',
line=dict(width=3, color='green')
),
name='EXECUTED BUY',
showlegend=True,
showlegend=True,
hovertemplate="<b>EXECUTED BUY TRADE</b><br>" +
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Confidence: %{customdata:.1%}<extra></extra>",
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Confidence: %{customdata:.1%}<extra></extra>",
customdata=[t['confidence'] for t in buy_trades]
),
row=row, col=1
)
),
row=row, col=1
)
# Add EXECUTED SELL trades (large red circles)
if sell_trades:
fig.add_trace(
go.Scatter(
fig.add_trace(
go.Scatter(
x=[t['x'] for t in sell_trades],
y=[t['y'] for t in sell_trades],
mode='markers',
marker=dict(
mode='markers',
marker=dict(
symbol='circle',
size=15,
color='rgba(255, 100, 100, 0.9)',
line=dict(width=3, color='red')
),
name='EXECUTED SELL',
showlegend=True,
showlegend=True,
hovertemplate="<b>EXECUTED SELL TRADE</b><br>" +
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Confidence: %{customdata:.1%}<extra></extra>",
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Confidence: %{customdata:.1%}<extra></extra>",
customdata=[t['confidence'] for t in sell_trades]
),
row=row, col=1
)
),
row=row, col=1
)
except Exception as e:
logger.warning(f"Error adding executed trades to main chart: {e}")
@ -742,36 +741,36 @@ class CleanTradingDashboard:
# Executed buy signals (solid green triangles)
if executed_buys:
fig.add_trace(
go.Scatter(
fig.add_trace(
go.Scatter(
x=[s['x'] for s in executed_buys],
y=[s['y'] for s in executed_buys],
mode='markers',
marker=dict(
mode='markers',
marker=dict(
symbol='triangle-up',
size=10,
size=10,
color='rgba(0, 255, 100, 1.0)',
line=dict(width=2, color='green')
),
name='BUY (Executed)',
showlegend=False,
hovertemplate="<b>BUY EXECUTED</b><br>" +
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Confidence: %{customdata:.1%}<extra></extra>",
customdata=[s['confidence'] for s in executed_buys]
),
row=row, col=1
)
),
row=row, col=1
)
# Pending/non-executed buy signals (hollow green triangles)
if pending_buys:
fig.add_trace(
go.Scatter(
fig.add_trace(
go.Scatter(
x=[s['x'] for s in pending_buys],
y=[s['y'] for s in pending_buys],
mode='markers',
marker=dict(
mode='markers',
marker=dict(
symbol='triangle-up',
size=8,
color='rgba(0, 255, 100, 0.5)',
@ -803,20 +802,20 @@ class CleanTradingDashboard:
mode='markers',
marker=dict(
symbol='triangle-down',
size=10,
size=10,
color='rgba(255, 100, 100, 1.0)',
line=dict(width=2, color='red')
),
name='SELL (Executed)',
showlegend=False,
hovertemplate="<b>SELL EXECUTED</b><br>" +
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Price: $%{y:.2f}<br>" +
"Time: %{x}<br>" +
"Confidence: %{customdata:.1%}<extra></extra>",
customdata=[s['confidence'] for s in executed_sells]
),
row=row, col=1
)
),
row=row, col=1
)
# Pending/non-executed sell signals (hollow red triangles)
if pending_sells:
@ -1031,48 +1030,9 @@ class CleanTradingDashboard:
# Check COB integration in Enhanced orchestrator
if hasattr(self.orchestrator, 'cob_integration'):
cob_integration = getattr(self.orchestrator, 'cob_integration', None)
if cob_integration is not None:
# Get real COB integration statistics
try:
if hasattr(cob_integration, 'get_statistics'):
cob_stats = cob_integration.get_statistics()
if cob_stats:
active_symbols = cob_stats.get('active_symbols', [])
total_updates = cob_stats.get('total_updates', 0)
provider_status = cob_stats.get('provider_status', 'Unknown')
if active_symbols:
status['cob_status'] = f'Enhanced COB Active ({len(active_symbols)} symbols)'
status['active_symbols'] = active_symbols
status['cache_size'] = total_updates
status['provider_status'] = provider_status
else:
status['cob_status'] = 'Enhanced COB Integration Loaded (No Data)'
else:
status['cob_status'] = 'Enhanced COB Integration (Stats Unavailable)'
else:
status['cob_status'] = 'Enhanced COB Integration (No Stats Method)'
except Exception as e:
logger.debug(f"Error getting COB statistics: {e}")
status['cob_status'] = 'Enhanced COB Integration (Error Getting Stats)'
else:
status['cob_status'] = 'Enhanced Orchestrator (COB Integration Not Initialized)'
# Don't log warning here to avoid spam, just info level
logger.debug("Enhanced orchestrator has COB integration attribute but it's None")
else:
status['cob_status'] = 'Enhanced Orchestrator Missing COB Integration'
logger.debug("Enhanced orchestrator available but has no COB integration attribute")
else:
status['cob_status'] = 'Enhanced Orchestrator Missing COB Integration'
logger.debug("Enhanced orchestrator available but has no COB integration attribute")
else:
if not ENHANCED_ORCHESTRATOR_AVAILABLE:
status['cob_status'] = 'Enhanced Orchestrator Not Available'
status['orchestrator_type'] = 'Basic (Enhanced Unavailable)'
else:
status['cob_status'] = 'Basic Orchestrator (No COB Support)'
status['orchestrator_type'] = 'Basic (Enhanced Not Used)'
# Basic orchestrator only - no enhanced COB features
status['cob_status'] = 'Basic Orchestrator (No COB Support)'
status['orchestrator_type'] = 'Basic'
return status
@ -1081,38 +1041,14 @@ class CleanTradingDashboard:
return {'error': str(e), 'cob_status': 'Error Getting Status', 'orchestrator_type': 'Unknown'}
def _get_cob_snapshot(self, symbol: str) -> Optional[Any]:
"""Get COB snapshot for symbol using enhanced orchestrator approach"""
"""Get COB snapshot for symbol - Basic orchestrator has no COB features"""
try:
# Get from Enhanced Orchestrator's COB integration (proper way)
if (ENHANCED_ORCHESTRATOR_AVAILABLE and
hasattr(self.orchestrator, 'cob_integration') and
self.orchestrator.__class__.__name__ == 'EnhancedTradingOrchestrator'):
cob_integration = getattr(self.orchestrator, 'cob_integration', None)
if cob_integration is not None:
# Get real COB snapshot using the proper method
if hasattr(cob_integration, 'get_cob_snapshot'):
snapshot = cob_integration.get_cob_snapshot(symbol)
if snapshot:
logger.debug(f"Retrieved Enhanced COB snapshot for {symbol}")
return snapshot
else:
logger.debug(f"No Enhanced COB data available for {symbol}")
elif hasattr(cob_integration, 'get_consolidated_orderbook'):
# Alternative method name
snapshot = cob_integration.get_consolidated_orderbook(symbol)
if snapshot:
logger.debug(f"Retrieved Enhanced COB orderbook for {symbol}")
return snapshot
else:
logger.warning("Enhanced COB integration has no recognized snapshot method")
else:
logger.debug(f"No Enhanced COB integration available for {symbol}")
# Basic orchestrator has no COB integration
logger.debug(f"No COB integration available for {symbol} (Basic orchestrator)")
return None
except Exception as e:
logger.warning(f"Error getting Enhanced COB snapshot for {symbol}: {e}")
logger.warning(f"Error getting COB snapshot for {symbol}: {e}")
return None
def _get_training_metrics(self) -> Dict:
@ -1176,10 +1112,10 @@ class CleanTradingDashboard:
},
'loss_5ma': cnn_last_loss,
'model_type': 'CNN',
'description': 'Williams Market Structure CNN' + (' (Enhanced Only)' if not is_enhanced else '')
'description': 'Williams Market Structure CNN (Basic Orchestrator - Inactive)'
}
loaded_models['cnn'] = cnn_model_info
# 3. COB RL Model Status - NOT AVAILABLE IN BASIC ORCHESTRATOR
cob_active = False
cob_last_loss = 0.012 # Default loss value
@ -1190,12 +1126,12 @@ class CleanTradingDashboard:
'parameters': 400000000, # 400M optimized (Enhanced COB integration)
'last_prediction': {
'timestamp': datetime.now().strftime('%H:%M:%S'),
'action': 'ENHANCED_COB_INFERENCE' if cob_active else ('INACTIVE' if is_enhanced else 'NOT_AVAILABLE'),
'action': 'INACTIVE',
'confidence': 0.0
},
'loss_5ma': cob_last_loss,
'model_type': 'ENHANCED_COB_RL',
'description': 'Enhanced COB Integration' + (' (Enhanced Only)' if not is_enhanced else ''),
'description': 'Enhanced COB Integration (Basic Orchestrator - Inactive)',
'predictions_count': cob_predictions_count
}
loaded_models['cob_rl'] = cob_model_info
@ -1212,14 +1148,14 @@ class CleanTradingDashboard:
'last_update': datetime.now().strftime('%H:%M:%S'),
'models_loaded': len(loaded_models),
'total_parameters': sum(m['parameters'] for m in loaded_models.values() if m['active']),
'orchestrator_type': 'Enhanced' if is_enhanced else 'Basic'
'orchestrator_type': 'Basic'
}
# COB $1 Buckets (sample data for now)
metrics['cob_buckets'] = self._get_cob_dollar_buckets()
return metrics
except Exception as e:
logger.error(f"Error getting training metrics: {e}")
return {'error': str(e), 'loaded_models': {}, 'training_status': {'active_sessions': 0}}
@ -1272,8 +1208,8 @@ class CleanTradingDashboard:
while True:
try:
# Generate signals for both symbols
for symbol in ['ETH/USDT', 'BTC/USDT']:
# Generate signals for ETH only (ignore BTC)
for symbol in ['ETH/USDT']: # Only ETH signals
try:
# Get current price
current_price = self._get_current_price(symbol)
@ -1371,7 +1307,7 @@ class CleanTradingDashboard:
self.recent_decisions = self.recent_decisions[-20:]
# Log signal generation
logger.info(f"Generated {signal['action']} signal for {signal['symbol']} "
logger.info(f"Generated ETH {signal['action']} signal for {signal['symbol']} "
f"(conf: {signal['confidence']:.2f}, model: {signal.get('model', 'UNKNOWN')})")
# DQN training not available in Basic orchestrator
@ -1720,178 +1656,58 @@ class CleanTradingDashboard:
logger.error(f"Error clearing session: {e}")
def _initialize_cob_integration_proper(self):
"""Initialize COB integration using Enhanced Orchestrator - PROPER APPROACH"""
"""Initialize COB integration - Basic orchestrator has no COB features"""
try:
logger.info("Connecting to COB integration from Enhanced Orchestrator...")
# Check if we have Enhanced Orchestrator
if not ENHANCED_ORCHESTRATOR_AVAILABLE:
logger.error("Enhanced Orchestrator not available - COB integration requires Enhanced Orchestrator")
return
# Check if Enhanced Orchestrator has COB integration
if not hasattr(self.orchestrator, 'cob_integration'):
logger.error("Enhanced Orchestrator has no cob_integration attribute")
return
if self.orchestrator.cob_integration is None:
logger.warning("Enhanced Orchestrator COB integration is None - needs to be started")
# Try to start the COB integration asynchronously
def start_cob_async():
"""Start COB integration in async context"""
import asyncio
async def _start_cob():
try:
# Start the COB integration from enhanced orchestrator
await self.orchestrator.start_cob_integration()
logger.info("COB integration started successfully from Enhanced Orchestrator")
# Register dashboard callback if possible
if hasattr(self.orchestrator.cob_integration, 'add_dashboard_callback'):
self.orchestrator.cob_integration.add_dashboard_callback(self._on_enhanced_cob_update)
logger.info("Registered dashboard callback with Enhanced COB integration")
except Exception as e:
logger.error(f"Error starting COB integration from Enhanced Orchestrator: {e}")
# Run in new event loop if needed
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running, schedule as task
asyncio.create_task(_start_cob())
else:
# If no loop running, run directly
loop.run_until_complete(_start_cob())
except RuntimeError:
# No event loop, create new one
asyncio.run(_start_cob())
# Start COB integration in background thread to avoid blocking dashboard
import threading
cob_start_thread = threading.Thread(target=start_cob_async, daemon=True)
cob_start_thread.start()
logger.info("Enhanced COB integration startup initiated in background")
else:
# COB integration already exists, just register callback
cob_integration = self.orchestrator.cob_integration
logger.info(f"Enhanced COB integration found: {type(cob_integration)}")
# Register callbacks if available
if hasattr(cob_integration, 'add_dashboard_callback'):
cob_integration.add_dashboard_callback(self._on_enhanced_cob_update)
logger.info("Registered dashboard callback with existing Enhanced COB integration")
# Verify COB integration is active and working
if hasattr(cob_integration, 'get_statistics'):
try:
stats = cob_integration.get_statistics()
logger.info(f"Enhanced COB statistics: {stats}")
except Exception as e:
logger.debug(f"Could not get COB statistics: {e}")
logger.info("Enhanced COB integration connection completed")
logger.info("NO SIMULATION - Using Enhanced Orchestrator real market data only")
logger.info("Basic orchestrator has no COB integration features")
logger.info("COB integration not available with Basic orchestrator")
except Exception as e:
logger.error(f"CRITICAL: Failed to connect to Enhanced COB integration: {e}")
logger.error("Dashboard will operate without COB data")
logger.error(f"Error in COB integration init: {e}")
def _on_enhanced_cob_update(self, symbol: str, cob_data: Dict):
"""Handle Enhanced COB data updates - NO SIMULATION"""
"""Handle Enhanced COB data updates - Basic orchestrator has no COB features"""
try:
# Process Enhanced COB data update
current_time = time.time()
# Update cache with Enhanced COB data (same format as cob_realtime_dashboard.py)
if symbol not in self.cob_cache:
self.cob_cache[symbol] = {'last_update': 0, 'data': None, 'updates_count': 0}
self.cob_cache[symbol] = {
'last_update': current_time,
'data': cob_data,
'updates_count': self.cob_cache[symbol].get('updates_count', 0) + 1
}
# Also update latest_cob_data for compatibility
self.latest_cob_data[symbol] = cob_data
# Log Enhanced COB data updates
update_count = self.cob_cache[symbol]['updates_count']
if update_count % 50 == 0: # Every 50 Enhanced updates
logger.info(f"[ENHANCED-COB] {symbol} - Enhanced update #{update_count}")
logger.debug("Enhanced COB updates not available with Basic orchestrator")
except Exception as e:
logger.error(f"Error handling Enhanced COB update for {symbol}: {e}")
logger.error(f"Error handling COB update for {symbol}: {e}")
def _start_cob_data_subscription(self):
"""Start COB data subscription with proper caching"""
"""Start COB data subscription - Basic orchestrator has no COB features"""
try:
# Start the COB RL trader asynchronously
import asyncio
def start_cob_trader():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.cob_rl_trader.start())
logger.info("COB RL trader started successfully")
except Exception as e:
logger.error(f"Error in COB trader loop: {e}")
finally:
loop.close()
# Start in separate thread to avoid blocking
import threading
cob_thread = threading.Thread(target=start_cob_trader, daemon=True)
cob_thread.start()
logger.info("COB data subscription not available with Basic orchestrator")
except Exception as e:
logger.error(f"Error starting COB data subscription: {e}")
logger.error(f"Error in COB subscription: {e}")
def _on_cob_prediction(self, prediction: PredictionResult):
"""Handle COB RL predictions"""
"""Handle COB RL predictions - Display both ETH and BTC for reference"""
try:
with self.cob_lock:
# Convert prediction to dashboard format
prediction_data = {
'timestamp': prediction.timestamp,
'direction': prediction.predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP
'confidence': prediction.confidence,
'predicted_change': prediction.predicted_change,
'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][prediction.predicted_direction],
'color': ['red', 'gray', 'green'][prediction.predicted_direction]
}
# Convert prediction to dashboard format
prediction_data = {
'timestamp': prediction.timestamp,
'direction': prediction.predicted_direction, # 0=DOWN, 1=SIDEWAYS, 2=UP
'confidence': prediction.confidence,
'predicted_change': prediction.predicted_change,
'direction_text': ['DOWN', 'SIDEWAYS', 'UP'][prediction.predicted_direction],
'color': ['red', 'gray', 'green'][prediction.predicted_direction]
}
# Add predictions to cache for both ETH and BTC (for reference/display)
if hasattr(prediction, 'symbol') and prediction.symbol:
symbol = prediction.symbol
# Store predictions for display (both ETH and BTC)
if symbol not in self.cob_predictions:
self.cob_predictions[symbol] = deque(maxlen=100)
# Add to predictions cache
self.cob_predictions[prediction.symbol].append(prediction_data)
self.cob_predictions[symbol].append(prediction_data)
# Cache COB data (1s buckets for 1 day max, 5 min retention)
current_time = datetime.now()
cob_data = {
'timestamp': current_time,
'prediction': prediction_data,
'features': prediction.features.tolist() if prediction.features is not None else []
}
# Add to 1d cache (1s buckets)
self.cob_data_cache_1d[prediction.symbol].append(cob_data)
# Add to raw ticks cache (15 seconds max, 10+ updates/sec)
self.cob_raw_ticks[prediction.symbol].append({
'timestamp': current_time,
'prediction': prediction_data,
'raw_features': prediction.features.tolist() if prediction.features is not None else []
})
logger.debug(f"COB prediction cached for {prediction.symbol}: "
# Log all predictions but note that only ETH generates trading signals
signal_note = " (TRADING ENABLED)" if 'ETH' in symbol.upper() else " (REFERENCE ONLY)"
logger.debug(f"COB prediction cached for {symbol}{signal_note}: "
f"{prediction_data['direction_text']} (confidence: {prediction.confidence:.3f})")
except Exception as e:
logger.error(f"Error handling COB prediction: {e}")
def _connect_to_orchestrator(self):
"""Connect to orchestrator for real trading signals"""
try:
@ -1905,8 +1721,20 @@ class CleanTradingDashboard:
logger.error(f"Error connecting to orchestrator: {e}")
def _on_trading_decision(self, decision):
"""Handle trading decision from orchestrator"""
"""Handle trading decision from orchestrator - Filter to show only ETH signals"""
try:
# Check if this decision is for ETH/USDT - ignore all BTC signals
symbol = None
if hasattr(decision, 'symbol'):
symbol = decision.symbol
elif isinstance(decision, dict) and 'symbol' in decision:
symbol = decision.get('symbol')
# Only process ETH signals, ignore BTC
if symbol and 'BTC' in symbol.upper():
logger.debug(f"Ignoring BTC signal: {symbol}")
return
# Convert orchestrator decision to dashboard format
# Handle both TradingDecision objects and dictionary formats
if hasattr(decision, 'action'):
@ -1916,6 +1744,7 @@ class CleanTradingDashboard:
'action': decision.action,
'confidence': decision.confidence,
'price': decision.price,
'symbol': getattr(decision, 'symbol', 'ETH/USDT'), # Add symbol field
'executed': True, # Orchestrator decisions are executed
'blocked': False,
'manual': False
@ -1927,17 +1756,24 @@ class CleanTradingDashboard:
'action': decision.get('action', 'UNKNOWN'),
'confidence': decision.get('confidence', 0),
'price': decision.get('price', 0),
'symbol': decision.get('symbol', 'ETH/USDT'), # Add symbol field
'executed': True, # Orchestrator decisions are executed
'blocked': False,
'manual': False
}
# Add to recent decisions
self.recent_decisions.append(dashboard_decision)
# Keep only last 50 decisions
if len(self.recent_decisions) > 50:
self.recent_decisions = self.recent_decisions[-50:]
# Only show ETH signals in dashboard
if dashboard_decision['symbol'] and 'ETH' in dashboard_decision['symbol'].upper():
# Add to recent decisions
self.recent_decisions.append(dashboard_decision)
# Keep only last 50 decisions
if len(self.recent_decisions) > 50:
self.recent_decisions = self.recent_decisions[-50:]
logger.info(f"ETH signal added to dashboard: {dashboard_decision['action']} (conf: {dashboard_decision['confidence']:.2f})")
else:
logger.debug(f"Non-ETH signal ignored: {dashboard_decision.get('symbol', 'UNKNOWN')}")
except Exception as e:
logger.error(f"Error handling trading decision: {e}")
@ -2130,9 +1966,9 @@ class CleanTradingDashboard:
self.tick_cache = self.tick_cache[-1000:]
if 'ohlcv' in data_packet:
# Update multi-timeframe data
# Update multi-timeframe data for both ETH and BTC (BTC for reference)
multi_tf_data = data_packet.get('multi_timeframe', {})
for symbol in ['ETH/USDT', 'BTC/USDT']:
for symbol in ['ETH/USDT', 'BTC/USDT']: # Process both ETH and BTC data
if symbol in multi_tf_data:
for timeframe in ['1s', '1m', '1h', '1d']:
if timeframe in multi_tf_data[symbol]:
@ -2321,4 +2157,4 @@ def create_clean_dashboard(data_provider: Optional[DataProvider] = None, orchest
data_provider=data_provider,
orchestrator=orchestrator,
trading_executor=trading_executor
)
)