template dash using real integrations (wip)

This commit is contained in:
Dobromir Popov
2025-07-02 03:05:11 +03:00
parent 03573cfb56
commit 29e4076638
2 changed files with 762 additions and 79 deletions

View File

@ -271,15 +271,15 @@
], ],
"decision": [ "decision": [
{ {
"checkpoint_id": "decision_20250702_013257", "checkpoint_id": "decision_20250702_020007",
"model_name": "decision", "model_name": "decision",
"model_type": "decision_fusion", "model_type": "decision_fusion",
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013257.pt", "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt",
"created_at": "2025-07-02T01:32:57.057698", "created_at": "2025-07-02T02:00:07.439094",
"file_size_mb": 0.06720924377441406, "file_size_mb": 0.06720924377441406,
"performance_score": 9.99999352005137, "performance_score": 9.999997759969705,
"accuracy": null, "accuracy": null,
"loss": 6.479948628599987e-06, "loss": 2.240030294586859e-06,
"val_accuracy": null, "val_accuracy": null,
"val_loss": null, "val_loss": null,
"reward": null, "reward": null,
@ -291,15 +291,15 @@
"wandb_artifact_name": null "wandb_artifact_name": null
}, },
{ {
"checkpoint_id": "decision_20250702_013256", "checkpoint_id": "decision_20250702_020007",
"model_name": "decision", "model_name": "decision",
"model_type": "decision_fusion", "model_type": "decision_fusion",
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013256.pt", "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt",
"created_at": "2025-07-02T01:32:56.667169", "created_at": "2025-07-02T02:00:07.707012",
"file_size_mb": 0.06720924377441406, "file_size_mb": 0.06720924377441406,
"performance_score": 9.999993471487318, "performance_score": 9.999997758801166,
"accuracy": null, "accuracy": null,
"loss": 6.528512681061979e-06, "loss": 2.2411988334327916e-06,
"val_accuracy": null, "val_accuracy": null,
"val_loss": null, "val_loss": null,
"reward": null, "reward": null,
@ -311,15 +311,15 @@
"wandb_artifact_name": null "wandb_artifact_name": null
}, },
{ {
"checkpoint_id": "decision_20250702_013255", "checkpoint_id": "decision_20250702_020007",
"model_name": "decision", "model_name": "decision",
"model_type": "decision_fusion", "model_type": "decision_fusion",
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt", "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt",
"created_at": "2025-07-02T01:32:55.915359", "created_at": "2025-07-02T02:00:07.570949",
"file_size_mb": 0.06720924377441406, "file_size_mb": 0.06720924377441406,
"performance_score": 9.999993469737547, "performance_score": 9.999997757764104,
"accuracy": null, "accuracy": null,
"loss": 6.5302624539599814e-06, "loss": 2.2422358958193754e-06,
"val_accuracy": null, "val_accuracy": null,
"val_loss": null, "val_loss": null,
"reward": null, "reward": null,
@ -331,15 +331,15 @@
"wandb_artifact_name": null "wandb_artifact_name": null
}, },
{ {
"checkpoint_id": "decision_20250702_013255", "checkpoint_id": "decision_20250702_020007",
"model_name": "decision", "model_name": "decision",
"model_type": "decision_fusion", "model_type": "decision_fusion",
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt", "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt",
"created_at": "2025-07-02T01:32:55.774316", "created_at": "2025-07-02T02:00:07.867047",
"file_size_mb": 0.06720924377441406, "file_size_mb": 0.06720924377441406,
"performance_score": 9.99999346914947, "performance_score": 9.999997757753505,
"accuracy": null, "accuracy": null,
"loss": 6.530850530594989e-06, "loss": 2.2422464945511442e-06,
"val_accuracy": null, "val_accuracy": null,
"val_loss": null, "val_loss": null,
"reward": null, "reward": null,
@ -351,15 +351,15 @@
"wandb_artifact_name": null "wandb_artifact_name": null
}, },
{ {
"checkpoint_id": "decision_20250702_013255", "checkpoint_id": "decision_20250702_020007",
"model_name": "decision", "model_name": "decision",
"model_type": "decision_fusion", "model_type": "decision_fusion",
"file_path": "NN\\models\\saved\\decision\\decision_20250702_013255.pt", "file_path": "NN\\models\\saved\\decision\\decision_20250702_020007.pt",
"created_at": "2025-07-02T01:32:55.646001", "created_at": "2025-07-02T02:00:07.302999",
"file_size_mb": 0.06720924377441406, "file_size_mb": 0.06720924377441406,
"performance_score": 9.99999346889822, "performance_score": 9.999997754320662,
"accuracy": null, "accuracy": null,
"loss": 6.531101780155828e-06, "loss": 2.245679338091438e-06,
"val_accuracy": null, "val_accuracy": null,
"val_loss": null, "val_loss": null,
"reward": null, "reward": null,

View File

@ -3,20 +3,33 @@ Template-based Trading Dashboard
Uses MVC architecture with HTML templates and data models Uses MVC architecture with HTML templates and data models
""" """
import logging import logging
from typing import Optional, Any, Dict, List import sys
from datetime import datetime import os
from typing import Optional, Any, Dict, List, Deque
from datetime import datetime, timedelta
import pandas as pd import pandas as pd
import pytz
import time
import threading
from collections import deque
from dataclasses import asdict
import dash import dash
from dash import dcc, html, Input, Output, State, callback_context from dash import dcc, html, Input, Output, State, callback_context
import plotly.graph_objects as go import plotly.graph_objects as go
import plotly.express as px import plotly.express as px
from .dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data
from .template_renderer import DashboardTemplateRenderer
from core.data_provider import DataProvider from core.data_provider import DataProvider
from core.orchestrator import TradingOrchestrator from core.orchestrator import TradingOrchestrator
from core.trading_executor import TradingExecutor from core.trading_executor import TradingExecutor
from core.config import get_config
from core.unified_data_stream import UnifiedDataStream
from web.dashboard_model import DashboardModel, DashboardDataBuilder, create_sample_dashboard_data
from web.template_renderer import DashboardTemplateRenderer
from web.component_manager import DashboardComponentManager
from web.layout_manager import DashboardLayoutManager
from utils.checkpoint_manager import save_checkpoint, load_best_checkpoint
from NN.models.advanced_transformer_trading import create_trading_transformer, TradingTransformerConfig
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -29,29 +42,125 @@ class TemplatedTradingDashboard:
orchestrator: Optional[TradingOrchestrator] = None, orchestrator: Optional[TradingOrchestrator] = None,
trading_executor: Optional[TradingExecutor] = None): trading_executor: Optional[TradingExecutor] = None):
"""Initialize the templated dashboard""" """Initialize the templated dashboard"""
self.data_provider = data_provider self.config = get_config()
self.orchestrator = orchestrator
self.trading_executor = trading_executor # Initialize components
self.data_provider = data_provider or DataProvider()
self.trading_executor = trading_executor or TradingExecutor()
# Initialize template renderer # Initialize template renderer
self.renderer = DashboardTemplateRenderer() self.renderer = DashboardTemplateRenderer()
# Initialize Dash app # Initialize unified orchestrator with full ML capabilities
if orchestrator is None:
self.orchestrator = TradingOrchestrator(
data_provider=self.data_provider,
enhanced_rl_training=True,
model_registry={}
)
logger.info("TEMPLATED DASHBOARD: Using unified Trading Orchestrator with full ML capabilities")
else:
self.orchestrator = orchestrator
# Initialize enhanced training system for predictions
self.training_system = None
self._initialize_enhanced_training_system()
# Initialize layout and component managers
self.layout_manager = DashboardLayoutManager(
starting_balance=self._get_initial_balance(),
trading_executor=self.trading_executor
)
self.component_manager = DashboardComponentManager()
# Initialize Universal Data Stream for the 5 timeseries architecture
self.unified_stream = UnifiedDataStream(self.data_provider, self.orchestrator)
self.stream_consumer_id = self.unified_stream.register_consumer(
consumer_name="TemplatedTradingDashboard",
callback=self._handle_unified_stream_data,
data_types=['ticks', 'ohlcv', 'training_data', 'ui_data']
)
logger.info(f"TEMPLATED DASHBOARD: Universal Data Stream initialized with consumer ID: {self.stream_consumer_id}")
logger.info("TEMPLATED DASHBOARD: Subscribed to Universal 5 Timeseries: ETH(ticks,1m,1h,1d) + BTC(ticks)")
# Dashboard state
self.recent_decisions: list = []
self.closed_trades: list = []
self.current_prices: dict = {}
self.session_pnl = 0.0
self.total_fees = 0.0
self.current_position: Optional[float] = 0.0
self.session_trades: list = []
# Model control toggles - separate inference and training
self.dqn_inference_enabled = True # Default: enabled
self.dqn_training_enabled = True # Default: enabled
self.cnn_inference_enabled = True
self.cnn_training_enabled = True
# Leverage management - adjustable x1 to x100
self.current_leverage = 50 # Default x50 leverage
self.min_leverage = 1
self.max_leverage = 100
self.pending_trade_case_id = None # For tracking opening trades until closure
# WebSocket streaming
self.ws_price_cache: dict = {}
self.is_streaming = False
self.tick_cache: list = []
# COB data cache - enhanced with price buckets and memory system
self.cob_cache: dict = {
'ETH/USDT': {'last_update': 0, 'data': None, 'updates_count': 0},
'BTC/USDT': {'last_update': 0, 'data': None, 'updates_count': 0}
}
self.latest_cob_data: dict = {} # Cache for COB integration data
self.cob_predictions: dict = {} # Cache for COB predictions (both ETH and BTC for display)
# COB High-frequency data handling (50-100 updates/sec)
self.cob_data_buffer: dict = {} # Buffer for high-freq data
self.cob_memory: dict = {} # Memory system like GPT - keeps last N snapshots
self.cob_price_buckets: dict = {} # Price bucket cache
self.cob_update_count = 0
self.last_cob_broadcast: Dict[str, Optional[float]] = {'ETH/USDT': None, 'BTC/USDT': None} # Rate limiting for UI updates, updated type
self.cob_data_history: Dict[str, Deque[Any]] = {
'ETH/USDT': deque(maxlen=61), # Store ~60 seconds of 1s snapshots
'BTC/USDT': deque(maxlen=61)
}
# Initialize timezone
timezone_name = self.config.get('system', {}).get('timezone', 'Europe/Sofia')
self.timezone = pytz.timezone(timezone_name)
# Create Dash app
self.app = dash.Dash(__name__, external_stylesheets=[ self.app = dash.Dash(__name__, external_stylesheets=[
'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css' 'https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css',
'https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css'
]) ])
# Session data # Suppress Dash development mode logging
self.session_start_time = datetime.now() self.app.enable_dev_tools(debug=False, dev_tools_silence_routes_logging=True)
self.session_trades = []
self.session_pnl = 0.0
self.current_position = 0.0
# Setup layout and callbacks # Setup layout and callbacks
self._setup_layout() self._setup_layout()
self._setup_callbacks() self._setup_callbacks()
logger.info("TEMPLATED DASHBOARD: Initialized with MVC architecture") # Start data streams
self._initialize_streaming()
# Connect to orchestrator for real trading signals
self._connect_to_orchestrator()
# Initialize COB integration with high-frequency data handling
self._initialize_cob_integration()
# Start signal generation loop to ensure continuous trading signals
self._start_signal_generation_loop()
# Start training sessions if models are showing FRESH status
threading.Thread(target=self._delayed_training_check, daemon=True).start()
logger.info("TEMPLATED DASHBOARD: Initialized with HIGH-FREQUENCY COB integration and signal generation")
def _setup_layout(self): def _setup_layout(self):
"""Setup the dashboard layout using templates""" """Setup the dashboard layout using templates"""
@ -65,7 +174,16 @@ class TemplatedTradingDashboard:
self.app.layout = layout self.app.layout = layout
def _get_initial_balance(self) -> float:
"""Get initial balance from trading executor or default"""
try:
if self.trading_executor and hasattr(self.trading_executor, 'starting_balance'):
balance = getattr(self.trading_executor, 'starting_balance', None)
if balance and balance > 0:
return balance
except Exception as e:
logger.warning(f"Error getting balance: {e}")
return 100.0 # Default balance
def _setup_callbacks(self): def _setup_callbacks(self):
"""Setup dashboard callbacks""" """Setup dashboard callbacks"""
@ -162,7 +280,8 @@ class TemplatedTradingDashboard:
def update_closed_trades(n): def update_closed_trades(n):
"""Update closed trades table""" """Update closed trades table"""
try: try:
return self._render_closed_trades() # Return the table wrapped in a Div
return html.Div(self._render_closed_trades())
except Exception as e: except Exception as e:
logger.error(f"Error updating closed trades: {e}") logger.error(f"Error updating closed trades: {e}")
return html.Div("No trades") return html.Div("No trades")
@ -465,44 +584,32 @@ class TemplatedTradingDashboard:
]) ])
]) ])
def _render_closed_trades(self) -> html.Table: def _render_closed_trades(self) -> html.Div:
"""Render closed trades table""" """Render closed trades table"""
return html.Table([ if not self.closed_trades:
html.Thead([ return html.Div("No closed trades yet.", className="alert alert-info mt-3")
html.Tr([
html.Th("Time"), # Create a DataFrame from closed trades
html.Th("Symbol"), df_trades = pd.DataFrame(self.closed_trades)
html.Th("Side"),
html.Th("Size"), # Format columns for display
html.Th("Entry"), df_trades['timestamp'] = pd.to_datetime(df_trades['timestamp']).dt.strftime('%Y-%m-%d %H:%M:%S')
html.Th("Exit"), df_trades['entry_price'] = df_trades['entry_price'].apply(lambda x: f"${x:,.2f}")
html.Th("PnL"), df_trades['exit_price'] = df_trades['exit_price'].apply(lambda x: f"${x:,.2f}")
html.Th("Duration") df_trades['pnl'] = df_trades['pnl'].apply(lambda x: f"${x:,.2f}")
]) df_trades['profit_percentage'] = df_trades['profit_percentage'].apply(lambda x: f"{x:,.2f}%")
]), df_trades['size'] = df_trades['size'].apply(lambda x: f"{x:,.4f}")
html.Tbody([ df_trades['fees'] = df_trades['fees'].apply(lambda x: f"${x:,.2f}")
html.Tr([
html.Td("14:23:45"), table_header = [html.Thead(html.Tr([html.Th(col) for col in df_trades.columns]))]
html.Td("ETH/USDT"), table_body = [html.Tbody([
html.Td([html.Span("BUY", className="badge bg-success")]), html.Tr([html.Td(df_trades.iloc[i][col]) for col in df_trades.columns]) for i in range(len(df_trades))
html.Td("1.5"), ])]
html.Td("$3420.45"),
html.Td("$3428.12"), return html.Div(
html.Td("$11.51", className="trade-profit"), html.Table(table_header + table_body, className="table table-striped table-hover table-sm"),
html.Td("2m 34s") className="table-responsive"
]), )
html.Tr([
html.Td("14:21:12"),
html.Td("BTC/USDT"),
html.Td([html.Span("SELL", className="badge bg-danger")]),
html.Td("0.1"),
html.Td("$45150.23"),
html.Td("$45142.67"),
html.Td("-$0.76", className="trade-loss"),
html.Td("1m 12s")
])
])
], className="table table-sm")
def _execute_manual_trade(self, action: str): def _execute_manual_trade(self, action: str):
"""Execute manual trade""" """Execute manual trade"""
@ -532,6 +639,582 @@ class TemplatedTradingDashboard:
logger.info(f"TEMPLATED DASHBOARD: Starting at http://{host}:{port}") logger.info(f"TEMPLATED DASHBOARD: Starting at http://{host}:{port}")
self.app.run(host=host, port=port, debug=debug) self.app.run(host=host, port=port, debug=debug)
def _handle_unified_stream_data(self, data):
"""Placeholder for unified stream data handling."""
logger.debug(f"Received data from unified stream: {data}")
def _delayed_training_check(self):
"""Check and start training after a delay to allow initialization"""
try:
time.sleep(10) # Wait 10 seconds for initialization
logger.info("Checking if models need training activation...")
self._start_actual_training_if_needed()
except Exception as e:
logger.error(f"Error in delayed training check: {e}")
def _initialize_enhanced_training_system(self):
"""Initialize enhanced training system for model predictions"""
try:
# Try to import and initialize enhanced training system
from enhanced_realtime_training import EnhancedRealtimeTrainingSystem
self.training_system = EnhancedRealtimeTrainingSystem(
orchestrator=self.orchestrator,
data_provider=self.data_provider,
dashboard=self
)
# Initialize prediction storage
if not hasattr(self.orchestrator, 'recent_dqn_predictions'):
self.orchestrator.recent_dqn_predictions = {}
if not hasattr(self.orchestrator, 'recent_cnn_predictions'):
self.orchestrator.recent_cnn_predictions = {}
logger.info("TEMPLATED DASHBOARD: Enhanced training system initialized for model predictions")
except ImportError:
logger.warning("TEMPLATED DASHBOARD: Enhanced training system not available - using mock predictions")
self.training_system = None
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error initializing enhanced training system: {e}")
self.training_system = None
def _initialize_streaming(self):
"""Initialize data streaming"""
try:
self._start_websocket_streaming()
self._start_data_collection()
logger.info("TEMPLATED DASHBOARD: Data streaming initialized")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error initializing streaming: {e}")
def _start_websocket_streaming(self):
"""Start WebSocket streaming for real-time data."""
ws_thread = threading.Thread(target=self._ws_worker, daemon=True)
ws_thread.start()
def _ws_worker(self):
try:
import websocket
import json # Added import
def on_message(ws, message):
try:
data = json.loads(message)
if 'k' in data:
kline = data['k']
tick_record = {
'symbol': 'ETHUSDT',
'datetime': datetime.fromtimestamp(int(kline['t']) / 1000),
'open': float(kline['o']),
'high': float(kline['h']),
'low': float(kline['l']),
'close': float(kline['c']),
'price': float(kline['c']),
'volume': float(kline['v']),
}
self.ws_price_cache['ETHUSDT'] = tick_record['price']
self.current_prices['ETH/USDT'] = tick_record['price']
self.tick_cache.append(tick_record)
if len(self.tick_cache) > 1000:
self.tick_cache.pop(0)
except Exception as e:
logger.warning(f"TEMPLATED DASHBOARD: WebSocket message error: {e}")
def on_error(ws, error):
logger.error(f"TEMPLATED DASHBOARD: WebSocket error: {error}")
self.is_streaming = False
def on_close(ws, close_status_code, close_msg):
logger.warning("TEMPLATED DASHBOARD: WebSocket connection closed")
self.is_streaming = False
def on_open(ws):
logger.info("TEMPLATED DASHBOARD: WebSocket connected")
self.is_streaming = True
ws_url = "wss://stream.binance.com:9443/ws/ethusdt@kline_1s"
ws = websocket.WebSocketApp(ws_url, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.run_forever()
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: WebSocket worker error: {e}")
self.is_streaming = False
def _start_data_collection(self):
"""Start background data collection"""
data_thread = threading.Thread(target=self._data_worker, daemon=True)
data_thread.start()
def _data_worker(self):
while True:
try:
self._update_session_metrics()
time.sleep(5)
except Exception as e:
logger.warning(f"TEMPLATED DASHBOARD: Data collection error: {e}")
time.sleep(10)
def _update_session_metrics(self):
"""Update session P&L and total fees from closed trades."""
try:
closed_trades = []
if self.trading_executor and hasattr(self.trading_executor, 'get_closed_trades'):
closed_trades = self.trading_executor.get_closed_trades()
self.closed_trades = closed_trades
if closed_trades:
self.session_pnl = sum(trade.get('pnl', 0) for trade in closed_trades)
self.total_fees = sum(trade.get('fees', 0) for trade in closed_trades)
else:
self.session_pnl = 0.0
self.total_fees = 0.0
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error updating session metrics: {e}")
def _connect_to_orchestrator(self):
"""Connect to orchestrator for real trading signals"""
try:
if self.orchestrator and hasattr(self.orchestrator, 'add_decision_callback'):
import asyncio # Added import
# from dataclasses import asdict # Moved asdict to top-level import
def connect_worker():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# No need to run_until_complete here, just register the callback
self.orchestrator.add_decision_callback(self._on_trading_decision)
logger.info("TEMPLATED DASHBOARD: Successfully connected to orchestrator for trading signals.")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Orchestrator connection worker failed: {e}")
thread = threading.Thread(target=connect_worker, daemon=True)
thread.start()
else:
logger.warning("TEMPLATED DASHBOARD: Orchestrator not available or doesn\'t support callbacks")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error initiating orchestrator connection: {e}")
async def _on_trading_decision(self, decision):
"""Handle trading decision from orchestrator."""
try:
action = getattr(decision, 'action', decision.get('action'))
if action == 'HOLD':
return
symbol = getattr(decision, 'symbol', decision.get('symbol', 'ETH/USDT'))
if 'ETH' not in symbol.upper():
return
dashboard_decision = asdict(decision) if not isinstance(decision, dict) else decision.copy()
dashboard_decision['timestamp'] = datetime.now()
dashboard_decision['executed'] = False
self.recent_decisions.append(dashboard_decision)
if len(self.recent_decisions) > 200:
self.recent_decisions.pop(0)
logger.info(f"TEMPLATED DASHBOARD: [ORCHESTRATOR SIGNAL] Received: {action} for {symbol}")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error handling trading decision: {e}")
def _initialize_cob_integration(self):
"""Initialize simple COB integration that works without async event loops"""
try:
logger.info("TEMPLATED DASHBOARD: Initializing simple COB integration for model feeding")
# Initialize COB data storage
self.cob_bucketed_data = {
'ETH/USDT': {},
'BTC/USDT': {}
}
self.cob_last_update: Dict[str, Optional[float]] = {
'ETH/USDT': None,
'BTC/USDT': None
} # Corrected type hint
# Start simple COB data collection
self._start_simple_cob_collection()
logger.info("TEMPLATED DASHBOARD: Simple COB integration initialized successfully")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error initializing COB integration: {e}")
self.cob_integration = None
def _start_simple_cob_collection(self):
"""Start simple COB data collection using REST APIs (no async required)"""
try:
# threading and time already imported
def cob_collector():
"""Collect COB data using simple REST API calls"""
while True:
try:
# Collect data for both symbols
for symbol in ['ETH/USDT', 'BTC/USDT']:
self._collect_simple_cob_data(symbol)
# Sleep for 1 second between collections
time.sleep(1)
except Exception as e:
logger.debug(f"TEMPLATED DASHBOARD: Error in COB collection: {e}")
time.sleep(5) # Wait longer on error
# Start collector in background thread
cob_thread = threading.Thread(target=cob_collector, daemon=True)
cob_thread.start()
logger.info("TEMPLATED DASHBOARD: Simple COB data collection started")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error starting COB collection: {e}")
def _collect_simple_cob_data(self, symbol: str):
"""Collect simple COB data using Binance REST API"""
try:
import requests # Added import
# time already imported
# Use Binance REST API for order book data
binance_symbol = symbol.replace('/', '')
url = f"https://api.binance.com/api/v3/depth?symbol={binance_symbol}&limit=500"
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
# Process order book data
bids = []
asks = []
# Process bids (buy orders)
for bid in data['bids'][:100]: # Top 100 levels
price = float(bid[0])
size = float(bid[1])
bids.append({
'price': price,
'size': size,
'total': price * size
})
# Process asks (sell orders)
for ask in data['asks'][:100]: # Top 100 levels
price = float(ask[0])
size = float(ask[1])
asks.append({
'price': price,
'size': size,
'total': price * size
})
# Calculate statistics
if bids and asks:
best_bid = max(bids, key=lambda x: x['price'])
best_ask = min(asks, key=lambda x: x['price'])
mid_price = (best_bid['price'] + best_ask['price']) / 2
spread_bps = ((best_ask['price'] - best_bid['price']) / mid_price) * 10000 if mid_price > 0 else 0
total_bid_liquidity = sum(bid['total'] for bid in bids[:20])
total_ask_liquidity = sum(ask['total'] for ask in asks[:20])
total_liquidity = total_bid_liquidity + total_ask_liquidity
imbalance = (total_bid_liquidity - total_ask_liquidity) / total_liquidity if total_liquidity > 0 else 0
# Create COB snapshot
cob_snapshot = {
'symbol': symbol,
'timestamp': time.time(),
'bids': bids,
'asks': asks,
'stats': {
'mid_price': mid_price,
'spread_bps': spread_bps,
'total_bid_liquidity': total_bid_liquidity,
'total_ask_liquidity': total_ask_liquidity,
'imbalance': imbalance,
'exchanges_active': ['Binance']
}
}
# Store in history (keep last 15 seconds)
self.cob_data_history[symbol].append(cob_snapshot)
if len(self.cob_data_history[symbol]) > 15: # Keep 15 seconds
# Use slicing to remove old elements from deque to ensure correct behavior
while len(self.cob_data_history[symbol]) > 15:
self.cob_data_history[symbol].popleft()
# Update latest data
self.latest_cob_data[symbol] = cob_snapshot
self.cob_last_update[symbol] = time.time()
# Generate bucketed data for models
self._generate_bucketed_cob_data(symbol, cob_snapshot)
logger.debug(f"TEMPLATED DASHBOARD: COB data collected for {symbol}: {len(bids)} bids, {len(asks)} asks")
except Exception as e:
logger.debug(f"TEMPLATED DASHBOARD: Error collecting COB data for {symbol}: {e}")
def _generate_bucketed_cob_data(self, symbol: str, cob_snapshot: dict):
"""Generate bucketed COB data for model feeding"""
try:
# Create price buckets (1 basis point granularity)
bucket_size_bps = 1.0
mid_price = cob_snapshot['stats']['mid_price']
# Initialize buckets
buckets = {}
# Process bids into buckets
for bid in cob_snapshot['bids']:
price_offset_bps = ((bid['price'] - mid_price) / mid_price) * 10000
bucket_key = int(price_offset_bps / bucket_size_bps)
if bucket_key not in buckets:
buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0}
buckets[bucket_key]['bid_volume'] += bid['total']
# Process asks into buckets
for ask in cob_snapshot['asks']:
price_offset_bps = ((ask['price'] - mid_price) / mid_price) * 10000
bucket_key = int(price_offset_bps / bucket_size_bps)
if bucket_key not in buckets:
buckets[bucket_key] = {'bid_volume': 0, 'ask_volume': 0}
buckets[bucket_key]['ask_volume'] += ask['total']
# Store bucketed data
self.cob_bucketed_data[symbol] = {
'timestamp': cob_snapshot['timestamp'],
'mid_price': mid_price,
'buckets': buckets,
'bucket_size_bps': bucket_size_bps
}
# Feed to models
self._feed_cob_data_to_models(symbol, cob_snapshot)
except Exception as e:
logger.debug(f"TEMPLATED DASHBOARD: Error generating bucketed COB data: {e}")
def _calculate_cumulative_imbalance(self, symbol: str) -> Dict[str, float]:
"""Calculate average imbalance over multiple time windows."""
stats = {}
now = time.time()
history = self.cob_data_history.get(symbol)
if not history:
return {'1s': 0.0, '5s': 0.0, '15s': 0.0, '60s': 0.0}
periods = {'1s': 1, '5s': 5, '15s': 15, '60s': 60}
for name, duration in periods.items():
recent_imbalances = []
for snap in history:
# Check if snap is a valid dict with timestamp and stats
if isinstance(snap, dict) and 'timestamp' in snap and (now - snap['timestamp'] <= duration) and 'stats' in snap and snap['stats']:
imbalance = snap['stats'].get('imbalance')
if imbalance is not None:
recent_imbalances.append(imbalance)
if recent_imbalances:
stats[name] = sum(recent_imbalances) / len(recent_imbalances)
else:
stats[name] = 0.0
# Debug logging to verify cumulative imbalance calculation
if any(value != 0.0 for value in stats.values()):
logger.debug(f"TEMPLATED DASHBOARD: [CUMULATIVE-IMBALANCE] {symbol}: {stats}")
return stats
def _feed_cob_data_to_models(self, symbol: str, cob_snapshot: dict):
"""Feed COB data to models for training and inference"""
try:
# Calculate cumulative imbalance for model feeding
cumulative_imbalance = self._calculate_cumulative_imbalance(symbol) # Assumes _calculate_cumulative_imbalance is available
history_data = {
'symbol': symbol,
'current_snapshot': cob_snapshot,
'history': list(self.cob_data_history[symbol]), # Convert deque to list for consistent slicing
'bucketed_data': self.cob_bucketed_data[symbol],
'cumulative_imbalance': cumulative_imbalance, # Add cumulative imbalance
'timestamp': cob_snapshot['timestamp']
}
# Pass to orchestrator for model feeding
if self.orchestrator and hasattr(self.orchestrator, 'feed_cob_data'):
self.orchestrator.feed_cob_data(symbol, history_data) # Assumes feed_cob_data exists in orchestrator
except Exception as e:
logger.debug(f"TEMPLATED DASHBOARD: Error feeding COB data to models: {e}")
def _is_signal_generation_active(self) -> bool:
"""Check if signal generation is active (e.g., models are loaded and running)"""
# For now, return true to always generate signals
# In a real system, this would check model loading status, training status, etc.
return True # Simplified for initial integration
def _start_signal_generation_loop(self):
"""Start signal generation loop to ensure continuous trading signals"""
try:
def signal_worker():
logger.info("TEMPLATED DASHBOARD: Signal generation worker started")
while True:
try:
# Ensure signal generation is active before processing
if self._is_signal_generation_active():
symbol = 'ETH/USDT' # Focus on ETH for now
current_price = self._get_current_price(symbol)
if current_price:
# Generate a momentum signal (simplified for demo)
signal = self._generate_momentum_signal(symbol, current_price) # Assumes _generate_momentum_signal is available
if signal:
self._process_dashboard_signal(signal) # Assumes _process_dashboard_signal is available
# Generate a DQN signal if enabled
if self.dqn_inference_enabled:
dqn_signal = self._generate_dqn_signal(symbol, current_price) # Assumes _generate_dqn_signal is available
if dqn_signal:
self._process_dashboard_signal(dqn_signal)
# Generate a CNN pivot signal if enabled
if self.cnn_inference_enabled:
cnn_signal = self._get_cnn_pivot_prediction() # Assumes _get_cnn_pivot_prediction is available
if cnn_signal:
self._process_dashboard_signal(cnn_signal)
# Update session metrics every 1 second interval to reflect new trades
self._update_session_metrics()
time.sleep(1) # Run every second for signal generation
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error in signal worker: {e}")
time.sleep(5) # Longer sleep on error
signal_thread = threading.Thread(target=signal_worker, daemon=True)
signal_thread.start()
logger.info("TEMPLATED DASHBOARD: Signal generation loop started")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error starting signal generation loop: {e}")
def _start_actual_training_if_needed(self):
"""Start actual model training with real data collection and training loops"""
try:
if not self.orchestrator:
logger.warning("TEMPLATED DASHBOARD: No orchestrator available for training")
return
logger.info("TEMPLATED DASHBOARD: TRAINING: Starting actual training system with real data collection")
self._start_real_training_system()
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error starting comprehensive training system: {e}")
def _start_real_training_system(self):
"""Start real training system with data collection and actual model training"""
try:
# Training performance metrics
self.training_performance = {
'decision': {'inference_times': [], 'training_times': [], 'total_calls': 0},
'cob_rl': {'inference_times': [], 'training_times': [], 'total_calls': 0},
'dqn': {'inference_times': [], 'training_times': [], 'total_calls': 0},
'cnn': {'inference_times': [], 'training_times': [], 'total_calls': 0},
'transformer': {'inference_times': [], 'training_times': [], 'total_calls': 0} # Added for transformer
}
def training_coordinator():
logger.info("TEMPLATED DASHBOARD: TRAINING: High-frequency training coordinator started")
training_iteration = 0
last_dqn_training = 0
last_cnn_training = 0
last_decision_training = 0
last_cob_rl_training = 0
last_transformer_training = 0 # For transformer
while True:
try:
training_iteration += 1
current_time = time.time()
market_data = self._collect_training_data() # Assumes _collect_training_data is available
if market_data:
logger.debug(f"TEMPLATED DASHBOARD: TRAINING: Collected {len(market_data)} market data points for training")
# High-frequency training for split-second decisions
# Train decision fusion and COB RL as fast as hardware allows
if current_time - last_decision_training > 0.1: # Every 100ms
start_time = time.time()
self._perform_real_decision_training(market_data) # Assumes _perform_real_decision_training is available
training_time = time.time() - start_time
self.training_performance['decision']['training_times'].append(training_time)
self.training_performance['decision']['total_calls'] += 1
last_decision_training = current_time
# Keep only last 100 measurements
if len(self.training_performance['decision']['training_times']) > 100:
self.training_performance['decision']['training_times'] = self.training_performance['decision']['training_times'][-100:]
# Advanced Transformer Training (every 200ms for comprehensive features)
if current_time - last_transformer_training > 0.2: # Every 200ms for transformer
start_time = time.time()
self._perform_real_transformer_training(market_data) # Assumes _perform_real_transformer_training is available
training_time = time.time() - start_time
self.training_performance['transformer']['training_times'].append(training_time)
self.training_performance['transformer']['total_calls'] += 1
last_transformer_training = current_time # Update last training time
# Keep only last 100 measurements
if len(self.training_performance['transformer']['training_times']) > 100:
self.training_performance['transformer']['training_times'] = self.training_performance['transformer']['training_times'][-100:]
if current_time - last_cob_rl_training > 0.1: # Every 100ms
start_time = time.time()
self._perform_real_cob_rl_training(market_data) # Assumes _perform_real_cob_rl_training is available
training_time = time.time() - start_time
self.training_performance['cob_rl']['training_times'].append(training_time)
self.training_performance['cob_rl']['total_calls'] += 1
last_cob_rl_training = current_time
# Keep only last 100 measurements
if len(self.training_performance['cob_rl']['training_times']) > 100:
self.training_performance['cob_rl']['training_times'] = self.training_performance['cob_rl']['training_times'][-100:]
# Standard frequency for larger models
if current_time - last_dqn_training > 30:
start_time = time.time()
self._perform_real_dqn_training(market_data) # Assumes _perform_real_dqn_training is available
training_time = time.time() - start_time
self.training_performance['dqn']['training_times'].append(training_time)
self.training_performance['dqn']['total_calls'] += 1
last_dqn_training = current_time
if len(self.training_performance['dqn']['training_times']) > 50:
self.training_performance['dqn']['training_times'] = self.training_performance['dqn']['training_times'][-50:]
if current_time - last_cnn_training > 45:
start_time = time.time()
self._perform_real_cnn_training(market_data) # Assumes _perform_real_cnn_training is available
training_time = time.time() - start_time
self.training_performance['cnn']['training_times'].append(training_time)
self.training_performance['cnn']['total_calls'] += 1
last_cnn_training = current_time
if len(self.training_performance['cnn']['training_times']) > 50:
self.training_performance['cnn']['training_times'] = self.training_performance['cnn']['training_times'][-50:]
self._update_training_progress(training_iteration) # Assumes _update_training_progress is available
# Log performance metrics every 100 iterations
if training_iteration % 100 == 0:
self._log_training_performance() # Assumes _log_training_performance is available
logger.info(f"TEMPLATED DASHBOARD: TRAINING: Iteration {training_iteration} - High-frequency training active")
# Minimal sleep for maximum responsiveness
time.sleep(0.05) # 50ms sleep for 20Hz training loop
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: TRAINING: Error in training iteration {training_iteration}: {e}")
time.sleep(1) # Shorter error recovery
training_thread = threading.Thread(target=training_coordinator, daemon=True)
training_thread.start()
logger.info("TEMPLATED DASHBOARD: Real training system started")
except Exception as e:
logger.error(f"TEMPLATED DASHBOARD: Error starting real training system: {e}")
def create_templated_dashboard(data_provider: Optional[DataProvider] = None, def create_templated_dashboard(data_provider: Optional[DataProvider] = None,
orchestrator: Optional[TradingOrchestrator] = None, orchestrator: Optional[TradingOrchestrator] = None,