import asyncio import json import logging from typing import Dict, List, Optional, Tuple, Union import websockets import plotly.graph_objects as go from plotly.subplots import make_subplots import dash from dash import html, dcc from dash.dependencies import Input, Output import pandas as pd import numpy as np from collections import deque import time from threading import Thread import requests import os from datetime import datetime, timedelta import pytz import tzlocal import threading import random import dash_bootstrap_components as dbc import uuid import ta from sklearn.preprocessing import MinMaxScaler import re import psutil import gc import websocket # Import psycopg2 with error handling try: import psycopg2 PSYCOPG2_AVAILABLE = True except ImportError: PSYCOPG2_AVAILABLE = False psycopg2 = None # TimescaleDB configuration from environment variables TIMESCALEDB_ENABLED = os.environ.get('TIMESCALEDB_ENABLED', '1') == '1' and PSYCOPG2_AVAILABLE TIMESCALEDB_HOST = os.environ.get('TIMESCALEDB_HOST', '192.168.0.10') TIMESCALEDB_PORT = int(os.environ.get('TIMESCALEDB_PORT', '5432')) TIMESCALEDB_USER = os.environ.get('TIMESCALEDB_USER', 'postgres') TIMESCALEDB_PASSWORD = os.environ.get('TIMESCALEDB_PASSWORD', 'timescaledbpass') TIMESCALEDB_DB = os.environ.get('TIMESCALEDB_DB', 'candles') class TimescaleDBHandler: """Handler for TimescaleDB operations for candle storage and retrieval""" def __init__(self): """Initialize TimescaleDB connection if enabled""" self.enabled = TIMESCALEDB_ENABLED self.conn = None if not self.enabled: if not PSYCOPG2_AVAILABLE: print("psycopg2 module not available. TimescaleDB integration disabled.") return try: # Connect to TimescaleDB self.conn = psycopg2.connect( host=TIMESCALEDB_HOST, port=TIMESCALEDB_PORT, user=TIMESCALEDB_USER, password=TIMESCALEDB_PASSWORD, dbname=TIMESCALEDB_DB ) print(f"Connected to TimescaleDB at {TIMESCALEDB_HOST}:{TIMESCALEDB_PORT}") # Ensure the candles table exists self._ensure_table() print("TimescaleDB integration initialized successfully") except Exception as e: print(f"Error connecting to TimescaleDB: {str(e)}") self.enabled = False self.conn = None def _ensure_table(self): """Ensure the candles table exists with TimescaleDB hypertable""" if not self.conn: return try: with self.conn.cursor() as cur: # Create the candles table if it doesn't exist cur.execute(''' CREATE TABLE IF NOT EXISTS candles ( symbol TEXT, interval TEXT, timestamp TIMESTAMPTZ, open DOUBLE PRECISION, high DOUBLE PRECISION, low DOUBLE PRECISION, close DOUBLE PRECISION, volume DOUBLE PRECISION, PRIMARY KEY (symbol, interval, timestamp) ); ''') # Check if the table is already a hypertable cur.execute(''' SELECT EXISTS ( SELECT 1 FROM timescaledb_information.hypertables WHERE hypertable_name = 'candles' ); ''') is_hypertable = cur.fetchone()[0] # Convert to hypertable if not already done if not is_hypertable: cur.execute(''' SELECT create_hypertable('candles', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE ); ''') self.conn.commit() print("TimescaleDB table structure verified") except Exception as e: print(f"Error setting up TimescaleDB tables: {str(e)}") self.enabled = False def upsert_candle(self, symbol, interval, candle): """Insert or update a candle in TimescaleDB""" if not self.enabled or not self.conn: return False try: with self.conn.cursor() as cur: cur.execute(''' INSERT INTO candles ( symbol, interval, timestamp, open, high, low, close, volume ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (symbol, interval, timestamp) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume ''', ( symbol, interval, candle['timestamp'], candle['open'], candle['high'], candle['low'], candle['close'], candle['volume'] )) self.conn.commit() return True except Exception as e: print(f"Error upserting candle to TimescaleDB: {str(e)}") # Try to reconnect on error try: self.conn = psycopg2.connect( host=TIMESCALEDB_HOST, port=TIMESCALEDB_PORT, user=TIMESCALEDB_USER, password=TIMESCALEDB_PASSWORD, dbname=TIMESCALEDB_DB ) except: pass return False def fetch_candles(self, symbol, interval, limit=1000): """Fetch candles from TimescaleDB""" if not self.enabled or not self.conn: return [] try: with self.conn.cursor() as cur: cur.execute(''' SELECT timestamp, open, high, low, close, volume FROM candles WHERE symbol = %s AND interval = %s ORDER BY timestamp DESC LIMIT %s ''', (symbol, interval, limit)) rows = cur.fetchall() # Convert to list of dictionaries (ordered from oldest to newest) candles = [] for row in reversed(rows): # Reverse to get oldest first candle = { 'timestamp': row[0], 'open': row[1], 'high': row[2], 'low': row[3], 'close': row[4], 'volume': row[5] } candles.append(candle) return candles except Exception as e: print(f"Error fetching candles from TimescaleDB: {str(e)}") # Try to reconnect on error try: self.conn = psycopg2.connect( host=TIMESCALEDB_HOST, port=TIMESCALEDB_PORT, user=TIMESCALEDB_USER, password=TIMESCALEDB_PASSWORD, dbname=TIMESCALEDB_DB ) except: pass return [] class BinanceHistoricalData: """ Class for fetching historical price data from Binance. """ def __init__(self): self.base_url = "https://api.binance.com/api/v3" self.cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache') if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) # Timestamp of last data update self.last_update = None def get_historical_candles(self, symbol, interval_seconds=3600, limit=1000): """ Fetch historical candles from Binance API. Args: symbol (str): Trading pair symbol (e.g., "BTC/USDT") interval_seconds (int): Timeframe in seconds (e.g., 3600 for 1h) limit (int): Number of candles to fetch Returns: pd.DataFrame: DataFrame with OHLCV data """ # Convert interval_seconds to Binance interval format interval_map = { 1: "1s", 60: "1m", 300: "5m", 900: "15m", 1800: "30m", 3600: "1h", 14400: "4h", 86400: "1d" } interval = interval_map.get(interval_seconds, "1h") # Format symbol for Binance API (remove slash and make uppercase) formatted_symbol = symbol.replace("/", "").upper() # Check if we have cached data first cache_file = self._get_cache_filename(formatted_symbol, interval) cached_data = self._load_from_cache(formatted_symbol, interval) # If we have cached data that's recent enough, use it if cached_data is not None and len(cached_data) >= limit: cache_age_minutes = (datetime.now() - self.last_update).total_seconds() / 60 if self.last_update else 60 if cache_age_minutes < 15: # Only use cache if it's less than 15 minutes old logger.info(f"Using cached historical data for {symbol} ({interval})") return cached_data try: # Build URL for klines endpoint url = f"{self.base_url}/klines" params = { "symbol": formatted_symbol, "interval": interval, "limit": limit } # Make the request response = requests.get(url, params=params) response.raise_for_status() # Parse the response data = response.json() # Create dataframe df = pd.DataFrame(data, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ]) # Convert timestamp to datetime df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") # Convert price columns to float for col in ["open", "high", "low", "close", "volume"]: df[col] = df[col].astype(float) # Sort by timestamp df = df.sort_values("timestamp") # Save to cache for future use self._save_to_cache(df, formatted_symbol, interval) self.last_update = datetime.now() logger.info(f"Fetched {len(df)} candles for {symbol} ({interval})") return df except Exception as e: logger.error(f"Error fetching historical data from Binance: {str(e)}") # Return cached data if we have it, even if it's not enough if cached_data is not None: logger.warning(f"Using cached data instead (may be incomplete)") return cached_data # Return empty dataframe on error return pd.DataFrame() def _get_cache_filename(self, symbol, interval): """Get filename for cache file""" return os.path.join(self.cache_dir, f"{symbol}_{interval}_candles.csv") def _load_from_cache(self, symbol, interval): """Load candles from cache file""" try: cache_file = self._get_cache_filename(symbol, interval) if os.path.exists(cache_file): # For 1s interval, check if the cache is recent (less than 10 minutes old) if interval == "1s" or interval == 1: file_mod_time = datetime.fromtimestamp(os.path.getmtime(cache_file)) time_diff = (datetime.now() - file_mod_time).total_seconds() / 60 if time_diff > 10: logger.info("1s cache is older than 10 minutes, skipping load") return None logger.info(f"Using recent 1s cache (age: {time_diff:.1f} minutes)") df = pd.read_csv(cache_file) df["timestamp"] = pd.to_datetime(df["timestamp"]) logger.info(f"Loaded {len(df)} candles from cache: {cache_file}") return df except Exception as e: logger.error(f"Error loading cached data: {str(e)}") return None def _save_to_cache(self, df, symbol, interval): """Save candles to cache file""" try: cache_file = self._get_cache_filename(symbol, interval) df.to_csv(cache_file, index=False) logger.info(f"Saved {len(df)} candles to cache: {cache_file}") return True except Exception as e: logger.error(f"Error saving to cache: {str(e)}") return False def get_recent_trades(self, symbol, limit=1000): """Get recent trades for a symbol""" formatted_symbol = symbol.replace("/", "") try: url = f"{self.base_url}/trades" params = { "symbol": formatted_symbol, "limit": limit } response = requests.get(url, params=params) response.raise_for_status() data = response.json() # Create dataframe df = pd.DataFrame(data) df["time"] = pd.to_datetime(df["time"], unit="ms") df["price"] = df["price"].astype(float) df["qty"] = df["qty"].astype(float) return df except Exception as e: logger.error(f"Error fetching recent trades: {str(e)}") return pd.DataFrame() class MultiTimeframeDataInterface: """ Enhanced Data Interface supporting: - Multiple trading pairs - Multiple timeframes per pair (1s, 1m, 1h, 1d + custom) - Technical indicators - Cross-timeframe normalization - Real-time data updates """ def __init__(self, symbol=None, timeframes=None, data_dir="data"): """ Initialize the data interface. Args: symbol (str): Trading pair symbol (e.g., "BTC/USDT") timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d']) data_dir (str): Directory to store/load datasets """ self.symbol = symbol self.timeframes = timeframes or ['1h', '4h', '1d'] self.data_dir = data_dir self.scalers = {} # Store scalers for each timeframe # Initialize the historical data fetcher self.historical_data = BinanceHistoricalData() # Create data directory if it doesn't exist os.makedirs(self.data_dir, exist_ok=True) # Initialize empty dataframes for each timeframe self.dataframes = {tf: None for tf in self.timeframes} # Store timestamps of last updates per timeframe self.last_updates = {tf: None for tf in self.timeframes} # Timeframe mapping (string to seconds) self.timeframe_to_seconds = { '1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800, '1h': 3600, '4h': 14400, '1d': 86400 } logger.info(f"MultiTimeframeDataInterface initialized for {symbol} with timeframes {timeframes}") def get_data(self, timeframe='1h', n_candles=1000, refresh=False, add_indicators=True): """ Fetch historical price data for a given timeframe with optional indicators. Args: timeframe (str): Timeframe to fetch data for n_candles (int): Number of candles to fetch refresh (bool): Force refresh of the data add_indicators (bool): Whether to add technical indicators Returns: pd.DataFrame: DataFrame with OHLCV data and indicators """ # Check if we need to refresh current_time = datetime.now() if (not refresh and self.dataframes[timeframe] is not None and self.last_updates[timeframe] is not None and (current_time - self.last_updates[timeframe]).total_seconds() < 60): logger.info(f"Using cached data for {self.symbol} {timeframe}") return self.dataframes[timeframe] interval_seconds = self.timeframe_to_seconds.get(timeframe, 3600) # Fetch data df = self.historical_data.get_historical_candles( symbol=self.symbol, interval_seconds=interval_seconds, limit=n_candles ) if df is None or df.empty: logger.error(f"No data available for {self.symbol} {timeframe}") return None # Add indicators if requested if add_indicators: df = self.add_indicators(df) # Store in cache self.dataframes[timeframe] = df self.last_updates[timeframe] = current_time logger.info(f"Fetched and processed {len(df)} candles for {self.symbol} {timeframe}") return df def add_indicators(self, df): """ Add comprehensive technical indicators to the dataframe. Args: df (pd.DataFrame): DataFrame with OHLCV data Returns: pd.DataFrame: DataFrame with added technical indicators """ # Make a copy to avoid modifying the original df_copy = df.copy() # Basic price indicators df_copy['returns'] = df_copy['close'].pct_change() df_copy['log_returns'] = np.log(df_copy['close'] / df_copy['close'].shift(1)) # Moving Averages df_copy['sma_7'] = ta.trend.sma_indicator(df_copy['close'], window=7) df_copy['sma_25'] = ta.trend.sma_indicator(df_copy['close'], window=25) df_copy['sma_99'] = ta.trend.sma_indicator(df_copy['close'], window=99) df_copy['ema_9'] = ta.trend.ema_indicator(df_copy['close'], window=9) df_copy['ema_21'] = ta.trend.ema_indicator(df_copy['close'], window=21) # MACD macd = ta.trend.MACD(df_copy['close']) df_copy['macd'] = macd.macd() df_copy['macd_signal'] = macd.macd_signal() df_copy['macd_diff'] = macd.macd_diff() # RSI df_copy['rsi'] = ta.momentum.rsi(df_copy['close'], window=14) # Bollinger Bands bollinger = ta.volatility.BollingerBands(df_copy['close']) df_copy['bb_high'] = bollinger.bollinger_hband() df_copy['bb_low'] = bollinger.bollinger_lband() df_copy['bb_pct'] = bollinger.bollinger_pband() # Stochastic Oscillator stoch = ta.momentum.StochasticOscillator(df_copy['high'], df_copy['low'], df_copy['close']) df_copy['stoch_k'] = stoch.stoch() df_copy['stoch_d'] = stoch.stoch_signal() # ATR - Average True Range df_copy['atr'] = ta.volatility.average_true_range(df_copy['high'], df_copy['low'], df_copy['close'], window=14) # Money Flow Index df_copy['mfi'] = ta.volume.money_flow_index(df_copy['high'], df_copy['low'], df_copy['close'], df_copy['volume'], window=14) # OBV - On-Balance Volume df_copy['obv'] = ta.volume.on_balance_volume(df_copy['close'], df_copy['volume']) # Ichimoku Cloud ichimoku = ta.trend.IchimokuIndicator(df_copy['high'], df_copy['low']) df_copy['ichimoku_a'] = ichimoku.ichimoku_a() df_copy['ichimoku_b'] = ichimoku.ichimoku_b() df_copy['ichimoku_base'] = ichimoku.ichimoku_base_line() df_copy['ichimoku_conv'] = ichimoku.ichimoku_conversion_line() # ADX - Average Directional Index adx = ta.trend.ADXIndicator(df_copy['high'], df_copy['low'], df_copy['close']) df_copy['adx'] = adx.adx() df_copy['adx_pos'] = adx.adx_pos() df_copy['adx_neg'] = adx.adx_neg() # VWAP - Volume Weighted Average Price (intraday) # Custom calculation since TA library doesn't include VWAP df_copy['vwap'] = (df_copy['volume'] * (df_copy['high'] + df_copy['low'] + df_copy['close']) / 3).cumsum() / df_copy['volume'].cumsum() # Fill NaN values df_copy = df_copy.fillna(method='bfill').fillna(0) return df_copy def get_multi_timeframe_data(self, timeframes=None, n_candles=1000, refresh=False, add_indicators=True): """ Fetch data for multiple timeframes. Args: timeframes (list): List of timeframes to fetch n_candles (int): Number of candles to fetch for each timeframe refresh (bool): Force refresh of the data add_indicators (bool): Whether to add technical indicators Returns: dict: Dictionary of dataframes indexed by timeframe """ if timeframes is None: timeframes = self.timeframes result = {} for tf in timeframes: # For higher timeframes, we need fewer candles tf_candles = n_candles if tf == '4h': tf_candles = max(250, n_candles // 4) elif tf == '1d': tf_candles = max(100, n_candles // 24) df = self.get_data(timeframe=tf, n_candles=tf_candles, refresh=refresh, add_indicators=add_indicators) if df is not None and not df.empty: result[tf] = df return result def prepare_training_data(self, window_size=20, train_ratio=0.8, refresh=False): """ Prepare training data from multiple timeframes. Args: window_size (int): Size of the sliding window train_ratio (float): Ratio of data to use for training refresh (bool): Whether to refresh the data Returns: tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices) """ # Get data for all timeframes data_dict = self.get_multi_timeframe_data(refresh=refresh) if not data_dict: logger.error("Failed to fetch data for any timeframe") return None, None, None, None, None, None # Align all dataframes by timestamp all_dfs = list(data_dict.values()) min_date = max([df['timestamp'].min() for df in all_dfs]) max_date = min([df['timestamp'].max() for df in all_dfs]) aligned_dfs = {} for tf, df in data_dict.items(): aligned_df = df[(df['timestamp'] >= min_date) & (df['timestamp'] <= max_date)] aligned_dfs[tf] = aligned_df # Choose the lowest timeframe as the reference for time alignment reference_tf = min(self.timeframes, key=lambda x: self.timeframe_to_seconds.get(x, 3600)) reference_df = aligned_dfs[reference_tf] # Create sliding windows for each timeframe X_dict = {} for tf, df in aligned_dfs.items(): # Drop timestamp and create numeric features features = df.drop('timestamp', axis=1).values # Ensure the feature array is 3D: [samples, window, features] X = np.array([features[i:i+window_size] for i in range(len(features)-window_size)]) X_dict[tf] = X # Create target labels based on future price movements reference_prices = reference_df['close'].values future_prices = reference_prices[window_size:] current_prices = reference_prices[window_size-1:-1] # Calculate returns returns = (future_prices - current_prices) / current_prices # Create labels: 0=SELL, 1=HOLD, 2=BUY threshold = 0.0005 # 0.05% threshold y = np.zeros(len(returns), dtype=int) y[returns > threshold] = 2 # BUY y[returns < -threshold] = 0 # SELL y[(returns >= -threshold) & (returns <= threshold)] = 1 # HOLD # Split into training and validation sets split_idx = int(len(y) * train_ratio) X_train_dict = {tf: X[:split_idx] for tf, X in X_dict.items()} X_val_dict = {tf: X[split_idx:] for tf, X in X_dict.items()} y_train = y[:split_idx] y_val = y[split_idx:] train_prices = reference_prices[window_size-1:window_size-1+split_idx] val_prices = reference_prices[window_size-1+split_idx:window_size-1+len(y)] logger.info(f"Prepared training data - Train: {len(y_train)}, Val: {len(y_val)}") return X_train_dict, y_train, X_val_dict, y_val, train_prices, val_prices def normalize_data(self, data_dict, fit=True): """ Normalize data across all timeframes. Args: data_dict (dict): Dictionary of data arrays by timeframe fit (bool): Whether to fit new scalers or use existing ones Returns: dict: Dictionary of normalized data arrays """ result = {} for tf, data in data_dict.items(): # For 3D data [samples, window, features] if len(data.shape) == 3: samples, window, features = data.shape reshaped = data.reshape(-1, features) if fit or tf not in self.scalers: self.scalers[tf] = MinMaxScaler() normalized = self.scalers[tf].fit_transform(reshaped) else: normalized = self.scalers[tf].transform(reshaped) result[tf] = normalized.reshape(samples, window, features) # For 2D data [samples, features] elif len(data.shape) == 2: if fit or tf not in self.scalers: self.scalers[tf] = MinMaxScaler() result[tf] = self.scalers[tf].fit_transform(data) else: result[tf] = self.scalers[tf].transform(data) return result def get_realtime_features(self, timeframes=None, window_size=20): """ Get the most recent data for real-time prediction. Args: timeframes (list): List of timeframes to use window_size (int): Size of the sliding window Returns: dict: Dictionary of feature arrays for the latest window """ if timeframes is None: timeframes = self.timeframes # Get fresh data data_dict = self.get_multi_timeframe_data(timeframes=timeframes, refresh=True) result = {} for tf, df in data_dict.items(): if len(df) < window_size: logger.warning(f"Not enough data for {tf} (need {window_size}, got {len(df)})") continue # Get the latest window latest_data = df.tail(window_size).drop('timestamp', axis=1).values # Add extra dimension to match model input shape [1, window_size, features] result[tf] = latest_data.reshape(1, window_size, -1) # Apply normalization using existing scalers if self.scalers: result = self.normalize_data(result, fit=False) return result def calculate_pnl(self, predictions, prices, position_size=1.0, fee_rate=0.0002): """ Calculate PnL and win rate from predictions. Args: predictions (np.ndarray): Array of predicted actions (0=SELL, 1=HOLD, 2=BUY) prices (np.ndarray): Array of prices position_size (float): Size of each position fee_rate (float): Trading fee rate (default: 0.0002 for 0.02% per trade) Returns: tuple: (total_pnl, win_rate, trades) """ if len(predictions) < 2 or len(prices) < 2: return 0.0, 0.0, [] # Ensure arrays are the same length min_len = min(len(predictions), len(prices)-1) actions = predictions[:min_len] pnl = 0.0 wins = 0 trades = [] for i in range(min_len): current_price = prices[i] next_price = prices[i+1] action = actions[i] # Skip HOLD actions if action == 1: continue price_change = (next_price - current_price) / current_price if action == 2: # BUY # Calculate raw PnL raw_pnl = price_change * position_size # Calculate fees (entry and exit) entry_fee = position_size * fee_rate exit_fee = position_size * (1 + price_change) * fee_rate total_fees = entry_fee + exit_fee # Net PnL after fees trade_pnl = raw_pnl - total_fees trade_type = 'BUY' is_win = trade_pnl > 0 elif action == 0: # SELL # Calculate raw PnL raw_pnl = -price_change * position_size # Calculate fees (entry and exit) entry_fee = position_size * fee_rate exit_fee = position_size * (1 - price_change) * fee_rate total_fees = entry_fee + exit_fee # Net PnL after fees trade_pnl = raw_pnl - total_fees trade_type = 'SELL' is_win = trade_pnl > 0 else: continue pnl += trade_pnl wins += int(is_win) trades.append({ 'type': trade_type, 'entry': float(current_price), # Ensure serializable 'exit': float(next_price), 'raw_pnl': float(raw_pnl), 'fees': float(total_fees), 'pnl': float(trade_pnl), 'win': bool(is_win), 'timestamp': datetime.now().isoformat() # Add timestamp }) win_rate = wins / len(trades) if trades else 0.0 return float(pnl), float(win_rate), trades # Configure logging with more detailed format logging.basicConfig( level=logging.INFO, # Changed to DEBUG for more detailed logs format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('realtime_chart.log') ] ) logger = logging.getLogger(__name__) # Neural Network integration (conditional import) NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1' nn_orchestrator = None nn_inference_thread = None if NN_ENABLED: try: import sys # Add project root to sys.path if needed project_root = os.path.dirname(os.path.abspath(__file__)) if project_root not in sys.path: sys.path.append(project_root) from NN.main import NeuralNetworkOrchestrator logger.info("Neural Network module enabled") except ImportError as e: logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}") NN_ENABLED = False # NN utility functions def setup_neural_network(): """Initialize the neural network components if enabled""" global nn_orchestrator, NN_ENABLED if not NN_ENABLED: return False try: # Get configuration from environment variables or use defaults symbol = os.environ.get('NN_SYMBOL', 'ETH/USDT') timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',') output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL # Configure the orchestrator config = { 'symbol': symbol, 'timeframes': timeframes, 'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')), 'n_features': 5, # OHLCV 'output_size': output_size, 'model_dir': 'NN/models/saved', 'data_dir': 'NN/data' } # Initialize the orchestrator logger.info(f"Initializing Neural Network Orchestrator with config: {config}") nn_orchestrator = NeuralNetworkOrchestrator(config) # Load the model model_loaded = nn_orchestrator.load_model() if not model_loaded: logger.warning("Failed to load neural network model. Using untrained model.") return model_loaded except Exception as e: logger.error(f"Error setting up neural network: {str(e)}") NN_ENABLED = False return False def start_nn_inference_thread(interval_seconds): """Start a background thread to periodically run inference with the neural network""" global nn_inference_thread if not NN_ENABLED or nn_orchestrator is None: logger.warning("Cannot start inference thread - Neural Network not enabled or initialized") return False def inference_worker(): """Worker function for the inference thread""" model_type = os.environ.get('NN_MODEL_TYPE', 'cnn') timeframe = os.environ.get('NN_TIMEFRAME', '1h') logger.info(f"Starting neural network inference thread with {interval_seconds}s interval") logger.info(f"Using model type: {model_type}, timeframe: {timeframe}") # Wait a bit for charts to initialize time.sleep(5) # Track active charts active_charts = [] while True: try: # Find active charts if we don't have them yet if not active_charts and 'charts' in globals(): active_charts = globals()['charts'] logger.info(f"Found {len(active_charts)} active charts for NN signals") # Run inference result = nn_orchestrator.run_inference_pipeline( model_type=model_type, timeframe=timeframe ) if result: # Log the result logger.info(f"Neural network inference result: {result}") # Add signal to charts if active_charts: try: if 'action' in result: action = result['action'] timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00')) # Get probability if available probability = None if 'probability' in result: probability = result['probability'] elif 'probabilities' in result: probability = result['probabilities'].get(action, None) # Add signal to each chart for chart in active_charts: if hasattr(chart, 'add_nn_signal'): chart.add_nn_signal(action, timestamp, probability) except Exception as e: logger.error(f"Error adding NN signal to chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Sleep for the interval time.sleep(interval_seconds) except Exception as e: logger.error(f"Error in inference thread: {str(e)}") import traceback logger.error(traceback.format_exc()) time.sleep(5) # Wait a bit before retrying # Create and start the thread nn_inference_thread = threading.Thread(target=inference_worker, daemon=True) nn_inference_thread.start() return True # Try to get local timezone, default to Sofia/EET if not available try: local_timezone = tzlocal.get_localzone() # Get timezone name safely try: tz_name = str(local_timezone) # Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone if hasattr(local_timezone, 'zone'): tz_name = local_timezone.zone elif hasattr(local_timezone, 'key'): tz_name = local_timezone.key else: tz_name = str(local_timezone) except: tz_name = "Local" logger.info(f"Detected local timezone: {local_timezone} ({tz_name})") except Exception as e: logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET") local_timezone = pytz.timezone('Europe/Sofia') tz_name = "Europe/Sofia" def convert_to_local_time(timestamp): """Convert timestamp to local timezone""" try: if isinstance(timestamp, pd.Timestamp): dt = timestamp.to_pydatetime() elif isinstance(timestamp, np.datetime64): dt = pd.Timestamp(timestamp).to_pydatetime() elif isinstance(timestamp, str): dt = pd.to_datetime(timestamp).to_pydatetime() else: dt = timestamp # If datetime is naive (no timezone), assume it's UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=pytz.UTC) # Convert to local timezone local_dt = dt.astimezone(local_timezone) return local_dt except Exception as e: logger.error(f"Error converting timestamp to local time: {str(e)}") return timestamp # Initialize TimescaleDB handler - only once, at module level timescaledb_handler = TimescaleDBHandler() if TIMESCALEDB_ENABLED else None class TickStorage: def __init__(self, symbol, timeframes=None, use_timescaledb=False): """Initialize the tick storage for a specific symbol""" self.symbol = symbol self.timeframes = timeframes or ["1s", "5m", "15m", "1h", "4h", "1d"] self.ticks = [] self.candles = {tf: [] for tf in self.timeframes} self.current_candle = {tf: None for tf in self.timeframes} self.last_candle_timestamp = {tf: None for tf in self.timeframes} self.cache_dir = os.path.join(os.getcwd(), "cache", symbol.replace("/", "")) self.cache_path = os.path.join(self.cache_dir, f"{symbol.replace('/', '')}_ticks.json") # Add missing cache_path self.use_timescaledb = use_timescaledb self.max_ticks = 10000 # Maximum number of ticks to store in memory # Create cache directory if it doesn't exist os.makedirs(self.cache_dir, exist_ok=True) logger.info(f"Creating new tick storage for {symbol} with timeframes {self.timeframes}") logger.info(f"Cache directory: {self.cache_dir}") logger.info(f"Cache file: {self.cache_path}") if use_timescaledb: print(f"TickStorage: TimescaleDB integration is ENABLED for {symbol}") else: logger.info(f"TickStorage: TimescaleDB integration is DISABLED for {symbol}") def _save_to_cache(self): """Save ticks to a cache file""" try: # Only save the latest 5000 ticks to avoid giant files ticks_to_save = self.ticks[-5000:] if len(self.ticks) > 5000 else self.ticks # Convert pandas Timestamps to ISO strings for JSON serialization serializable_ticks = [] for tick in ticks_to_save: serializable_tick = tick.copy() if isinstance(tick['timestamp'], pd.Timestamp): serializable_tick['timestamp'] = tick['timestamp'].isoformat() elif hasattr(tick['timestamp'], 'isoformat'): serializable_tick['timestamp'] = tick['timestamp'].isoformat() else: # Keep as is if it's already a string or number serializable_tick['timestamp'] = tick['timestamp'] serializable_ticks.append(serializable_tick) with open(self.cache_path, 'w') as f: json.dump(serializable_ticks, f) logger.debug(f"Saved {len(serializable_ticks)} ticks to cache") except Exception as e: logger.error(f"Error saving ticks to cache: {e}") def _load_from_cache(self): """Load ticks from cache if available""" if os.path.exists(self.cache_path): try: # Check if the cache file is recent (< 10 minutes old) cache_age = time.time() - os.path.getmtime(self.cache_path) if cache_age > 600: # 10 minutes in seconds logger.warning(f"Cache file is {cache_age:.1f} seconds old (>10 min). Not using it.") return False with open(self.cache_path, 'r') as f: cached_ticks = json.load(f) if cached_ticks: # Convert ISO strings back to pandas Timestamps processed_ticks = [] for tick in cached_ticks: processed_tick = tick.copy() if isinstance(tick['timestamp'], str): try: processed_tick['timestamp'] = pd.Timestamp(tick['timestamp']) except: # If parsing fails, use current time processed_tick['timestamp'] = pd.Timestamp.now() else: # Convert to pandas Timestamp if it's a number (milliseconds) processed_tick['timestamp'] = pd.Timestamp(tick['timestamp'], unit='ms') processed_ticks.append(processed_tick) self.ticks = processed_ticks logger.info(f"Loaded {len(cached_ticks)} ticks from cache") return True except Exception as e: logger.error(f"Error loading ticks from cache: {e}") return False def add_tick(self, tick=None, price=None, volume=None, timestamp=None): """ Add a tick to the storage and update candles for all timeframes Args: tick (dict, optional): A tick object containing price, quantity and timestamp price (float, optional): Price of the tick (used in older interface) volume (float, optional): Volume of the tick (used in older interface) timestamp (datetime, optional): Timestamp of the tick (used in older interface) """ # Handle tick as a dict or separate parameters for backward compatibility if tick is not None and isinstance(tick, dict): # Using the new interface with a tick object price = tick['price'] volume = tick.get('quantity', 0) timestamp = tick['timestamp'] elif price is not None: # Using the old interface with separate parameters # Convert datetime to pd.Timestamp if needed if timestamp is not None and not isinstance(timestamp, pd.Timestamp): timestamp = pd.Timestamp(timestamp) else: logger.error("Invalid tick: must provide either a tick dict or price") return # Ensure timestamp is a pandas Timestamp if not isinstance(timestamp, pd.Timestamp): if isinstance(timestamp, (int, float)): # Assume it's milliseconds timestamp = pd.Timestamp(timestamp, unit='ms') else: # Try to parse as string or datetime timestamp = pd.Timestamp(timestamp) # Create tick object with consistent pandas Timestamp tick_obj = { 'price': float(price), 'quantity': float(volume) if volume is not None else 0.0, 'timestamp': timestamp } # Add to the list of ticks self.ticks.append(tick_obj) # Limit the number of ticks to avoid memory issues if len(self.ticks) > self.max_ticks: self.ticks = self.ticks[-self.max_ticks:] # Update candles for all timeframes for timeframe in self.timeframes: if timeframe == "1s": self._update_1s_candle(tick_obj) else: self._update_candles_for_timeframe(timeframe, tick_obj) # Cache to disk periodically self._try_cache_ticks() def _update_1s_candle(self, tick): """Update the 1-second candle with the new tick""" # Get timestamp for the start of the current second tick_timestamp = tick['timestamp'] candle_timestamp = pd.Timestamp(int(tick_timestamp.timestamp() // 1 * 1_000_000_000)) # Check if we need to create a new candle if self.current_candle["1s"] is None or self.current_candle["1s"]["timestamp"] != candle_timestamp: # If we have a current candle, finalize it and add to candles list if self.current_candle["1s"] is not None: # Add the completed candle to the list self.candles["1s"].append(self.current_candle["1s"]) # Limit the number of stored candles to prevent memory issues if len(self.candles["1s"]) > 3600: # Keep last hour of 1s candles self.candles["1s"] = self.candles["1s"][-3600:] # Store in TimescaleDB if enabled if self.use_timescaledb: timescaledb_handler.upsert_candle( self.symbol, "1s", self.current_candle["1s"] ) # Log completed candle for debugging logger.debug(f"Completed 1s candle: {self.current_candle['1s']['timestamp']} - Close: {self.current_candle['1s']['close']}") # Create a new candle self.current_candle["1s"] = { "timestamp": candle_timestamp, "open": float(tick["price"]), "high": float(tick["price"]), "low": float(tick["price"]), "close": float(tick["price"]), "volume": float(tick["quantity"]) if "quantity" in tick else 0.0 } # Update last candle timestamp self.last_candle_timestamp["1s"] = candle_timestamp logger.debug(f"Created new 1s candle at {candle_timestamp}") else: # Update the current candle current = self.current_candle["1s"] price = float(tick["price"]) # Update high and low if price > current["high"]: current["high"] = price if price < current["low"]: current["low"] = price # Update close price and add volume current["close"] = price current["volume"] += float(tick["quantity"]) if "quantity" in tick else 0.0 def _update_candles_for_timeframe(self, timeframe, tick): """Update candles for a specific timeframe""" # Skip 1s as it's handled separately if timeframe == "1s": return # Convert timeframe to seconds timeframe_seconds = self._timeframe_to_seconds(timeframe) # Get the timestamp truncated to the timeframe interval # e.g., for a 5m candle, the timestamp should be truncated to the nearest 5-minute mark # Convert timestamp to datetime if it's not already tick_timestamp = tick['timestamp'] if isinstance(tick_timestamp, pd.Timestamp): ts = tick_timestamp else: ts = pd.Timestamp(tick_timestamp) # Truncate timestamp to nearest timeframe interval timestamp = pd.Timestamp( int(ts.timestamp() // timeframe_seconds * timeframe_seconds * 1_000_000_000) ) # Get the current candle for this timeframe current_candle = self.current_candle[timeframe] # If we have no current candle or the timestamp is different (new candle) if current_candle is None or current_candle['timestamp'] != timestamp: # If we have a current candle, add it to the candles list if current_candle: self.candles[timeframe].append(current_candle) # Save to TimescaleDB if enabled if self.use_timescaledb: timescaledb_handler.upsert_candle(self.symbol, timeframe, current_candle) # Create a new candle current_candle = { 'timestamp': timestamp, 'open': tick['price'], 'high': tick['price'], 'low': tick['price'], 'close': tick['price'], 'volume': tick.get('quantity', 0) } # Update current candle self.current_candle[timeframe] = current_candle self.last_candle_timestamp[timeframe] = timestamp else: # Update existing candle current_candle['high'] = max(current_candle['high'], tick['price']) current_candle['low'] = min(current_candle['low'], tick['price']) current_candle['close'] = tick['price'] current_candle['volume'] += tick.get('quantity', 0) # Limit the number of candles to avoid memory issues max_candles = 1000 if len(self.candles[timeframe]) > max_candles: self.candles[timeframe] = self.candles[timeframe][-max_candles:] def _timeframe_to_seconds(self, timeframe): """Convert a timeframe string (e.g., '1m', '1h') to seconds""" if timeframe == "1s": return 1 try: # Extract the number and unit match = re.match(r'(\d+)([smhdw])', timeframe) if not match: return None num, unit = match.groups() num = int(num) # Convert to seconds if unit == 's': return num elif unit == 'm': return num * 60 elif unit == 'h': return num * 3600 elif unit == 'd': return num * 86400 elif unit == 'w': return num * 604800 return None except: return None def get_candles(self, timeframe, limit=None): """Get candles for a given timeframe""" if timeframe in self.candles: candles = self.candles[timeframe] # Add the current candle if it exists and isn't None if timeframe in self.current_candle and self.current_candle[timeframe] is not None: # Make a copy of the current candle current_candle_copy = self.current_candle[timeframe].copy() # Check if the current candle is newer than the last candle in the list if not candles or current_candle_copy["timestamp"] > candles[-1]["timestamp"]: candles = candles + [current_candle_copy] # Apply limit if provided if limit and len(candles) > limit: return candles[-limit:] return candles return [] def get_last_price(self): """Get the last known price""" if self.ticks: return float(self.ticks[-1]["price"]) return None def load_historical_data(self, symbol, limit=1000): """Load historical data for all timeframes""" logger.info(f"Starting historical data load for {symbol} with limit {limit}") # Clear existing data self.ticks = [] self.candles = {tf: [] for tf in self.timeframes} self.current_candle = {tf: None for tf in self.timeframes} # Try to load ticks from cache first logger.info("Attempting to load from cache...") cache_loaded = self._load_from_cache() if cache_loaded: logger.info("Successfully loaded data from cache") else: logger.info("No valid cache data found") # Check if we have TimescaleDB enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: logger.info("Attempting to fetch historical data from TimescaleDB") loaded_from_db = False # Load candles for each timeframe from TimescaleDB for tf in self.timeframes: try: candles = timescaledb_handler.fetch_candles(symbol, tf, limit) if candles: self.candles[tf] = candles loaded_from_db = True logger.info(f"Loaded {len(candles)} {tf} candles from TimescaleDB") else: logger.info(f"No {tf} candles found in TimescaleDB") except Exception as e: logger.error(f"Error loading {tf} candles from TimescaleDB: {str(e)}") if loaded_from_db: logger.info("Successfully loaded historical data from TimescaleDB") return True else: logger.info("TimescaleDB not available or disabled") # If no TimescaleDB data and no cache, we need to get from Binance API if not cache_loaded: logger.info("Loading data from Binance API...") # Create a BinanceHistoricalData instance historical_data = BinanceHistoricalData() # Load data for each timeframe success_count = 0 for tf in self.timeframes: if tf != "1s": # Skip 1s since we'll generate it from ticks try: logger.info(f"Fetching {tf} candles for {symbol}...") df = historical_data.get_historical_candles(symbol, self._timeframe_to_seconds(tf), limit) if df is not None and not df.empty: logger.info(f"Loaded {len(df)} {tf} candles from Binance API") # Convert to our candle format and store candles = [] for _, row in df.iterrows(): candle = { 'timestamp': row['timestamp'], 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } candles.append(candle) # Also save to TimescaleDB if enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: timescaledb_handler.upsert_candle(symbol, tf, candle) self.candles[tf] = candles success_count += 1 else: logger.warning(f"No data returned for {tf} candles") except Exception as e: logger.error(f"Error loading {tf} candles: {str(e)}") import traceback logger.error(traceback.format_exc()) logger.info(f"Successfully loaded {success_count} timeframes from Binance API") # For 1s, load from API if possible or compute from first available timeframe if "1s" in self.timeframes: logger.info("Loading 1s candles...") # Try to get 1s data from Binance try: df_1s = historical_data.get_historical_candles(symbol, 1, 300) # Only need recent 1s data if df_1s is not None and not df_1s.empty: logger.info(f"Loaded {len(df_1s)} recent 1s candles from Binance API") # Convert to our candle format and store candles_1s = [] for _, row in df_1s.iterrows(): candle = { 'timestamp': row['timestamp'], 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } candles_1s.append(candle) # Also save to TimescaleDB if enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: timescaledb_handler.upsert_candle(symbol, "1s", candle) self.candles["1s"] = candles_1s except Exception as e: logger.error(f"Error loading 1s candles: {str(e)}") # If 1s data not available or failed to load, approximate from 1m data if not self.candles.get("1s"): logger.info("1s data not available, trying to approximate from 1m data...") # If 1s data not available, we can approximate from 1m data if "1m" in self.timeframes and self.candles["1m"]: # For demonstration, just use the 1m candles as placeholders for 1s # In a real implementation, you might want more sophisticated interpolation logger.info("Using 1m candles as placeholders for 1s timeframe") self.candles["1s"] = [] # Take the most recent 5 minutes of 1m candles recent_1m = self.candles["1m"][-5:] if self.candles["1m"] else [] logger.info(f"Creating 1s approximations from {len(recent_1m)} 1m candles") for candle_1m in recent_1m: # Create 60 1s candles for each 1m candle ts_base = candle_1m["timestamp"].timestamp() for i in range(60): # Create a 1s candle with interpolated values candle_1s = { 'timestamp': pd.Timestamp(int((ts_base + i) * 1_000_000_000)), 'open': candle_1m['open'], 'high': candle_1m['high'], 'low': candle_1m['low'], 'close': candle_1m['close'], 'volume': candle_1m['volume'] / 60.0 # Distribute volume evenly } self.candles["1s"].append(candle_1s) # Also save to TimescaleDB if enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: timescaledb_handler.upsert_candle(symbol, "1s", candle_1s) logger.info(f"Created {len(self.candles['1s'])} approximated 1s candles") else: logger.warning("No 1m data available to approximate 1s candles from") # Set the last candle of each timeframe as the current candle for tf in self.timeframes: if self.candles[tf]: self.current_candle[tf] = self.candles[tf][-1].copy() self.last_candle_timestamp[tf] = self.current_candle[tf]["timestamp"] logger.debug(f"Set current candle for {tf}: {self.current_candle[tf]['timestamp']}") # If we loaded ticks from cache, rebuild candles if cache_loaded: logger.info("Rebuilding candles from cached ticks...") # Clear candles self.candles = {tf: [] for tf in self.timeframes} self.current_candle = {tf: None for tf in self.timeframes} # Process each tick to rebuild the candles for tick in self.ticks: for tf in self.timeframes: if tf == "1s": self._update_1s_candle(tick) else: self._update_candles_for_timeframe(tf, tick) logger.info("Finished rebuilding candles from ticks") # Log final results for tf in self.timeframes: count = len(self.candles[tf]) logger.info(f"Final {tf} candle count: {count}") has_data = cache_loaded or any(self.candles[tf] for tf in self.timeframes) logger.info(f"Historical data loading completed. Has data: {has_data}") return has_data def _try_cache_ticks(self): """Try to save ticks to cache periodically""" # Only save to cache every 100 ticks to avoid excessive disk I/O if len(self.ticks) % 100 == 0: try: self._save_to_cache() except Exception as e: # Don't spam logs with cache errors, just log once every 1000 ticks if len(self.ticks) % 1000 == 0: logger.warning(f"Cache save failed at {len(self.ticks)} ticks: {str(e)}") pass # Continue even if cache fails class Position: """Represents a trading position""" def __init__(self, action, entry_price, amount, timestamp=None, trade_id=None, fee_rate=0.0002): self.action = action self.entry_price = entry_price self.amount = amount self.entry_timestamp = timestamp or datetime.now() self.exit_timestamp = None self.exit_price = None self.pnl = None self.is_open = True self.trade_id = trade_id or str(uuid.uuid4())[:8] self.fee_rate = fee_rate self.paid_fee = entry_price * amount * fee_rate # Calculate entry fee def close(self, exit_price, exit_timestamp=None): """Close an open position""" self.exit_price = exit_price self.exit_timestamp = exit_timestamp or datetime.now() self.is_open = False # Calculate P&L if self.action == "BUY": price_diff = self.exit_price - self.entry_price # Calculate fee for exit trade exit_fee = exit_price * self.amount * self.fee_rate self.paid_fee += exit_fee # Add exit fee to total paid fee self.pnl = (price_diff * self.amount) - self.paid_fee else: # SELL price_diff = self.entry_price - self.exit_price # Calculate fee for exit trade exit_fee = exit_price * self.amount * self.fee_rate self.paid_fee += exit_fee # Add exit fee to total paid fee self.pnl = (price_diff * self.amount) - self.paid_fee return self.pnl class RealTimeChart: def __init__(self, app=None, symbol='BTCUSDT', timeframe='1m', standalone=True, chart_title=None, run_signal_interpreter=False, debug_mode=False, historical_candles=None, extended_hours=False, enable_logging=True, agent=None, trading_env=None, max_memory_usage=90, memory_check_interval=10, tick_update_interval=0.5, chart_update_interval=1, performance_monitoring=False, show_volume=True, show_indicators=True, custom_trades=None, port=8050, height=900, width=1200, positions_callback=None, allow_synthetic_data=True, tick_storage=None): """Initialize a real-time chart with support for multiple indicators and backtesting.""" # Store parameters self.symbol = symbol self.timeframe = timeframe self.debug_mode = debug_mode self.standalone = standalone self.chart_title = chart_title or f"{symbol} Real-Time Chart" self.extended_hours = extended_hours self.enable_logging = enable_logging self.run_signal_interpreter = run_signal_interpreter self.historical_candles = historical_candles self.performance_monitoring = performance_monitoring self.max_memory_usage = max_memory_usage self.memory_check_interval = memory_check_interval self.tick_update_interval = tick_update_interval self.chart_update_interval = chart_update_interval self.show_volume = show_volume self.show_indicators = show_indicators self.custom_trades = custom_trades self.port = port self.height = height self.width = width self.positions_callback = positions_callback self.allow_synthetic_data = allow_synthetic_data # Initialize interval store self.interval_store = {'interval': 1} # Default to 1s timeframe # Initialize trading components self.agent = agent self.trading_env = trading_env # Initialize button styles for timeframe selection self.button_style = { 'background': '#343a40', 'color': 'white', 'border': 'none', 'padding': '10px 20px', 'margin': '0 5px', 'borderRadius': '4px', 'cursor': 'pointer' } self.active_button_style = { 'background': '#007bff', 'color': 'white', 'border': 'none', 'padding': '10px 20px', 'margin': '0 5px', 'borderRadius': '4px', 'cursor': 'pointer', 'fontWeight': 'bold' } # Initialize color schemes self.colors = { 'background': '#1e1e1e', 'text': '#ffffff', 'grid': '#333333', 'candle_up': '#26a69a', 'candle_down': '#ef5350', 'volume_up': 'rgba(38, 166, 154, 0.3)', 'volume_down': 'rgba(239, 83, 80, 0.3)', 'ma': '#ffeb3b', 'ema': '#29b6f6', 'bollinger_bands': '#ff9800', 'trades_buy': '#00e676', 'trades_sell': '#ff1744' } # Initialize data storage self.all_trades = [] # Store trades self.positions = [] # Store open positions self.latest_price = 0.0 self.latest_volume = 0.0 self.latest_timestamp = datetime.now() self.current_balance = 100.0 # Starting balance self.accumulative_pnl = 0.0 # Accumulated profit/loss # Initialize trade rate counter self.trade_count = 0 self.start_time = time.time() self.trades_per_second = 0 self.trades_per_minute = 0 self.trades_per_hour = 0 # Initialize trade rate tracking variables self.trade_times = [] # Store timestamps of recent trades for rate calculation self.last_trade_rate_calculation = datetime.now() self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0} # Initialize interactive components self.app = app # Create a new app if not provided if self.app is None and standalone: self.app = dash.Dash( __name__, external_stylesheets=[dbc.themes.DARKLY], suppress_callback_exceptions=True ) # Initialize tick storage if not provided if tick_storage is None: # Check if TimescaleDB integration is enabled use_timescaledb = TIMESCALEDB_ENABLED and timescaledb_handler is not None # Create a new tick storage self.tick_storage = TickStorage( symbol=symbol, timeframes=["1s", "1m", "5m", "15m", "1h", "4h", "1d"], use_timescaledb=use_timescaledb ) # Load historical data immediately for cold start logger.info(f"Loading historical data for {symbol} during chart initialization") try: data_loaded = self.tick_storage.load_historical_data(symbol) if data_loaded: logger.info(f"Successfully loaded historical data for {symbol}") # Log what we have for tf in ["1s", "1m", "5m", "15m", "1h"]: candle_count = len(self.tick_storage.candles.get(tf, [])) logger.info(f" {tf}: {candle_count} candles") else: logger.warning(f"Failed to load historical data for {symbol}") except Exception as e: logger.error(f"Error loading historical data during initialization: {str(e)}") import traceback logger.error(traceback.format_exc()) else: self.tick_storage = tick_storage # Create layout and callbacks if app is provided if self.app is not None: # Create the layout self.app.layout = self._create_layout() # Register callbacks self._setup_callbacks() # Log initialization if self.enable_logging: logger.info(f"RealTimeChart initialized: {self.symbol} ({self.timeframe}) ") def _create_layout(self): return html.Div([ # Header section with title and current price html.Div([ html.H1(f"{self.symbol} Real-Time Chart", className="display-4"), # Current price ticker html.Div([ html.H4("Current Price:", style={"display": "inline-block", "marginRight": "10px"}), html.H3(id="current-price", style={"display": "inline-block", "color": "#17a2b8"}), html.Div([ html.H5("Balance:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.H5(id="current-balance", style={"display": "inline-block", "color": "#28a745"}), ], style={"display": "inline-block", "marginLeft": "40px"}), html.Div([ html.H5("Accumulated PnL:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.H5(id="accumulated-pnl", style={"display": "inline-block", "color": "#ffc107"}), ], style={"display": "inline-block", "marginLeft": "40px"}), # Add trade rate display html.Div([ html.H5("Trade Rate:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.Span([ html.Span(id="trade-rate-second", style={"color": "#ff7f0e"}), html.Span("/s, "), html.Span(id="trade-rate-minute", style={"color": "#ff7f0e"}), html.Span("/m, "), html.Span(id="trade-rate-hour", style={"color": "#ff7f0e"}), html.Span("/h") ], style={"display": "inline-block"}), ], style={"display": "inline-block", "marginLeft": "40px"}), ], style={"textAlign": "center", "margin": "20px 0"}), ], style={"textAlign": "center", "marginBottom": "20px"}), # Add interval component for periodic updates dcc.Interval( id='interval-component', interval=500, # in milliseconds n_intervals=0 ), # Add timeframe selection buttons html.Div([ html.Button('1s', id='btn-1s', n_clicks=0, style=self.active_button_style), html.Button('5s', id='btn-5s', n_clicks=0, style=self.button_style), html.Button('15s', id='btn-15s', n_clicks=0, style=self.button_style), html.Button('1m', id='btn-1m', n_clicks=0, style=self.button_style), html.Button('5m', id='btn-5m', n_clicks=0, style=self.button_style), html.Button('15m', id='btn-15m', n_clicks=0, style=self.button_style), html.Button('1h', id='btn-1h', n_clicks=0, style=self.button_style), ], style={"textAlign": "center", "marginBottom": "20px"}), # Store for the selected timeframe dcc.Store(id='interval-store', data={'interval': 1}), # Chart content (without wrapper div to avoid callback issues) dcc.Graph(id='live-chart', style={"height": "600px"}), dcc.Graph(id='secondary-charts', style={"height": "500px"}), html.Div(id='positions-list') ]) def _create_chart_and_controls(self): """Create the chart and controls for the dashboard.""" try: # Get selected interval from the dashboard (default to 1s if not available) interval_seconds = 1 if hasattr(self, 'interval_store') and self.interval_store: interval_seconds = self.interval_store.get('interval', 1) # Create chart components chart_div = html.Div([ # Update chart with data for the selected interval dcc.Graph( id='live-chart', figure=self._update_main_chart(interval_seconds), style={"height": "600px"} ), # Update secondary charts dcc.Graph( id='secondary-charts', figure=self._update_secondary_charts(), style={"height": "500px"} ), # Update positions list html.Div( id='positions-list', children=self._get_position_list_rows() ) ]) return chart_div except Exception as e: logger.error(f"Error creating chart and controls: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return a simple error message as fallback return html.Div(f"Error loading chart: {str(e)}", style={"color": "red", "padding": "20px"}) def _setup_callbacks(self): """Set up all the callbacks for the dashboard""" # Callback for timeframe selection @self.app.callback( [Output('interval-store', 'data'), Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'), Output('btn-1m', 'style'), Output('btn-5m', 'style'), Output('btn-15m', 'style'), Output('btn-1h', 'style')], [Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'), Input('btn-1m', 'n_clicks'), Input('btn-5m', 'n_clicks'), Input('btn-15m', 'n_clicks'), Input('btn-1h', 'n_clicks')], [dash.dependencies.State('interval-store', 'data')] ) def update_interval(n1, n5, n15, n60, n300, n900, n3600, data): ctx = dash.callback_context if not ctx.triggered: # Default state (1s selected) return ({'interval': 1}, self.active_button_style, self.button_style, self.button_style, self.button_style, self.button_style, self.button_style, self.button_style) button_id = ctx.triggered[0]['prop_id'].split('.')[0] # Initialize all buttons to inactive button_styles = [self.button_style] * 7 # Set the active button and interval if button_id == 'btn-1s': button_styles[0] = self.active_button_style return ({'interval': 1}, *button_styles) elif button_id == 'btn-5s': button_styles[1] = self.active_button_style return ({'interval': 5}, *button_styles) elif button_id == 'btn-15s': button_styles[2] = self.active_button_style return ({'interval': 15}, *button_styles) elif button_id == 'btn-1m': button_styles[3] = self.active_button_style return ({'interval': 60}, *button_styles) elif button_id == 'btn-5m': button_styles[4] = self.active_button_style return ({'interval': 300}, *button_styles) elif button_id == 'btn-15m': button_styles[5] = self.active_button_style return ({'interval': 900}, *button_styles) elif button_id == 'btn-1h': button_styles[6] = self.active_button_style return ({'interval': 3600}, *button_styles) # Default - keep current interval current_interval = data.get('interval', 1) # Set the appropriate button as active if current_interval == 1: button_styles[0] = self.active_button_style elif current_interval == 5: button_styles[1] = self.active_button_style elif current_interval == 15: button_styles[2] = self.active_button_style elif current_interval == 60: button_styles[3] = self.active_button_style elif current_interval == 300: button_styles[4] = self.active_button_style elif current_interval == 900: button_styles[5] = self.active_button_style elif current_interval == 3600: button_styles[6] = self.active_button_style return (data, *button_styles) # Main update callback @self.app.callback( [Output('live-chart', 'figure'), Output('secondary-charts', 'figure'), Output('positions-list', 'children'), Output('current-price', 'children'), Output('current-balance', 'children'), Output('accumulated-pnl', 'children'), Output('trade-rate-second', 'children'), Output('trade-rate-minute', 'children'), Output('trade-rate-hour', 'children')], [Input('interval-component', 'n_intervals'), Input('interval-store', 'data')] ) def update_all(n, interval_data): try: # Get selected interval interval = interval_data.get('interval', 1) # Get updated chart figures main_fig = self._update_main_chart(interval) secondary_fig = self._update_secondary_charts() # Get updated positions list positions = self._get_position_list_rows() # Format the current price current_price = "$ ---.--" if self.latest_price is not None: current_price = f"${self.latest_price:.2f}" # Format balance and PnL balance_text = f"${self.current_balance:.2f}" pnl_text = f"${self.accumulative_pnl:.2f}" # Get trade rate statistics trade_rate = self.calculate_trade_rate() per_second = f"{trade_rate['per_second']:.1f}" per_minute = f"{trade_rate['per_minute']:.1f}" per_hour = f"{trade_rate['per_hour']:.1f}" return main_fig, secondary_fig, positions, current_price, balance_text, pnl_text, per_second, per_minute, per_hour except Exception as e: logger.error(f"Error in update callback: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return empty updates on error return {}, {}, [], "Error", "$0.00", "$0.00", "0.0", "0.0", "0.0" def _update_secondary_charts(self): """Update the secondary charts for other timeframes""" try: # For each timeframe, create a small chart secondary_timeframes = ['1m', '5m', '15m', '1h'] if not all(tf in self.tick_storage.candles for tf in secondary_timeframes): logger.warning("Not all secondary timeframes available") # Return empty figure with a message fig = make_subplots(rows=1, cols=4) for i, tf in enumerate(secondary_timeframes, 1): fig.add_annotation( text=f"No data for {tf}", xref=f"x{i}", yref=f"y{i}", x=0.5, y=0.5, showarrow=False ) return fig # Create subplots for each timeframe fig = make_subplots( rows=1, cols=4, subplot_titles=secondary_timeframes, shared_yaxes=True ) # Loop through each timeframe for i, timeframe in enumerate(secondary_timeframes, 1): interval_key = timeframe # Get candles for this timeframe if interval_key in self.tick_storage.candles and self.tick_storage.candles[interval_key]: # For rendering, limit to the last 100 candles for performance candles = self.tick_storage.candles[interval_key][-100:] if candles: # Extract OHLC values timestamps = [candle['timestamp'] for candle in candles] opens = [candle['open'] for candle in candles] highs = [candle['high'] for candle in candles] lows = [candle['low'] for candle in candles] closes = [candle['close'] for candle in candles] # Add candlestick trace fig.add_trace(go.Candlestick( x=timestamps, open=opens, high=highs, low=lows, close=closes, name=interval_key, increasing_line_color='rgba(0, 180, 0, 0.7)', decreasing_line_color='rgba(255, 0, 0, 0.7)', showlegend=False ), row=1, col=i) else: # Add empty annotation if no data fig.add_annotation( text=f"No data for {interval_key}", xref=f"x{i}", yref=f"y{i}", x=0.5, y=0.5, showarrow=False ) # Update layout fig.update_layout( height=250, template="plotly_dark", showlegend=False, margin=dict(l=0, r=0, t=30, b=0), ) # Format Y-axis with 2 decimal places fig.update_yaxes(tickformat=".2f") # Format X-axis to show only the date (no time) for i in range(1, 5): fig.update_xaxes( row=1, col=i, rangeslider_visible=False, rangebreaks=[dict(bounds=["sat", "mon"])], # hide weekends tickformat="%m-%d" # Show month-day only ) return fig except Exception as e: logger.error(f"Error updating secondary charts: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return empty figure on error return make_subplots(rows=1, cols=4) def _get_position_list_rows(self): """Generate HTML for the positions list (last 10 positions only)""" try: if not hasattr(self, 'positions') or not self.positions: # Return placeholder if no positions return html.Div("No positions to display", style={"textAlign": "center", "padding": "20px"}) # Create table headers table_header = [ html.Thead(html.Tr([ html.Th("ID"), html.Th("Action"), html.Th("Entry Price"), html.Th("Exit Price"), html.Th("Amount"), html.Th("PnL"), html.Th("Time") ])) ] # Create table rows for only the last 10 positions to avoid overcrowding rows = [] last_positions = self.positions[-10:] if len(self.positions) > 10 else self.positions for position in last_positions: # Format times entry_time = position.entry_timestamp.strftime("%H:%M:%S") exit_time = position.exit_timestamp.strftime("%H:%M:%S") if position.exit_timestamp else "-" # Format PnL pnl_value = position.pnl if position.pnl is not None else 0 pnl_text = f"${pnl_value:.2f}" if position.pnl is not None else "-" pnl_style = {"color": "green" if position.pnl and position.pnl > 0 else "red"} # Create row row = html.Tr([ html.Td(position.trade_id), html.Td(position.action), html.Td(f"${position.entry_price:.2f}"), html.Td(f"${position.exit_price:.2f}" if position.exit_price else "-"), html.Td(f"{position.amount:.4f}"), html.Td(pnl_text, style=pnl_style), html.Td(f"{entry_time} → {exit_time}") ]) rows.append(row) table_body = [html.Tbody(rows)] # Add summary row for total PnL and other statistics total_trades = len(self.positions) winning_trades = sum(1 for p in self.positions if p.pnl and p.pnl > 0) win_rate = winning_trades / total_trades * 100 if total_trades > 0 else 0 # Format display colors for PnL pnl_color = "green" if self.accumulative_pnl >= 0 else "red" summary_row = html.Tr([ html.Td("SUMMARY", colSpan=2, style={"fontWeight": "bold"}), html.Td(f"Trades: {total_trades}"), html.Td(f"Win Rate: {win_rate:.1f}%"), html.Td("Total PnL:", style={"fontWeight": "bold"}), html.Td(f"${self.accumulative_pnl:.2f}", style={"color": pnl_color, "fontWeight": "bold"}), html.Td(f"Balance: ${self.current_balance:.2f}") ], style={"backgroundColor": "rgba(80, 80, 80, 0.3)"}) # Create the table with improved styling table = html.Table( table_header + table_body + [html.Tfoot([summary_row])], style={ "width": "100%", "textAlign": "center", "borderCollapse": "collapse", "marginTop": "20px" }, className="table table-striped table-dark" ) return table except Exception as e: logger.error(f"Error generating position list: {str(e)}") import traceback logger.error(traceback.format_exc()) return html.Div("Error displaying positions") def get_candles(self, interval_seconds=60): """Get candles for the specified interval""" try: # Get candles from tick storage interval_key = self._get_interval_key(interval_seconds) candles_list = self.tick_storage.get_candles(interval_key) if not candles_list: logger.warning(f"No candle data available for {interval_key} - trying to load historical data") # Try to load historical data if we don't have any self.tick_storage.load_historical_data(self.symbol) candles_list = self.tick_storage.get_candles(interval_key) if not candles_list: logger.error(f"Still no candle data available for {interval_key} after loading historical data") return [] logger.info(f"Retrieved {len(candles_list)} candles for {interval_key}") return candles_list except Exception as e: logger.error(f"Error getting candles: {str(e)}") import traceback logger.error(traceback.format_exc()) return [] # Return empty list on error def _get_interval_key(self, interval_seconds): """Convert interval seconds to a key used in the tick storage""" if interval_seconds < 60: return f"{interval_seconds}s" elif interval_seconds < 3600: return f"{interval_seconds // 60}m" elif interval_seconds < 86400: return f"{interval_seconds // 3600}h" else: return f"{interval_seconds // 86400}d" def calculate_trade_rate(self): """Calculate and return trading rate statistics""" now = datetime.now() # Only calculate once per second to avoid unnecessary processing if (now - self.last_trade_rate_calculation).total_seconds() < 1.0: return self.trade_rate self.last_trade_rate_calculation = now # Clean up old trade times (older than 1 hour) one_hour_ago = now - timedelta(hours=1) self.trade_times = [t for t in self.trade_times if t > one_hour_ago] if not self.trade_times: self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0} return self.trade_rate # Calculate rates based on time windows last_second = now - timedelta(seconds=1) last_minute = now - timedelta(minutes=1) # Count trades in each time window trades_last_second = sum(1 for t in self.trade_times if t > last_second) trades_last_minute = sum(1 for t in self.trade_times if t > last_minute) trades_last_hour = len(self.trade_times) # All remaining trades are from last hour # Calculate rates self.trade_rate = { "per_second": trades_last_second, "per_minute": trades_last_minute, "per_hour": trades_last_hour } return self.trade_rate def _update_chart_and_positions(self): """Update the chart with current data and positions""" try: # Force an update of the charts self._update_main_chart(1) # Update 1s chart by default self._update_secondary_charts() logger.debug("Updated charts and positions") return True except Exception as e: logger.error(f"Error updating chart and positions: {str(e)}") import traceback logger.error(traceback.format_exc()) return False async def start_websocket(self): """Start the websocket connection for real-time data""" try: # Step 1: Clear everything related to positions FIRST, before any other operations logger.info(f"Initializing fresh chart for {self.symbol} - clearing all previous positions") self.positions = [] # Clear positions list self.accumulative_pnl = 0.0 # Reset accumulated PnL self.current_balance = 100.0 # Reset balance # Step 2: Clear any previous tick data to avoid using stale data from previous training sessions self.tick_storage.ticks = [] # Step 3: Clear any previous 1s candles before loading historical data self.tick_storage.candles['1s'] = [] logger.info("Initialized empty 1s candles, tick collection, and positions for fresh data") # Load historical data first to ensure we have candles for all timeframes logger.info(f"Loading historical data for {self.symbol}") # Load historical data directly from tick_storage self.tick_storage.load_historical_data(self.symbol) # Double check that we have the 1s timeframe initialized if '1s' not in self.tick_storage.candles: self.tick_storage.candles['1s'] = [] logger.info(f"After loading historical data, 1s candles count: {len(self.tick_storage.candles['1s'])}") # Make sure we update the charts once with historical data before websocket starts # Update all the charts with the initial historical data self._update_chart_and_positions() # Initialize websocket self.websocket = ExchangeWebSocket(self.symbol) await self.websocket.connect() logger.info(f"WebSocket connected for {self.symbol}") # Counter for received ticks tick_count = 0 last_update_time = time.time() # Start receiving data while self.websocket.running: try: data = await self.websocket.receive() if data: # Process the received data if 'price' in data: tick_count += 1 # Create a proper tick with timestamp object tick = { 'price': data['price'], 'quantity': data.get('volume', 0), 'timestamp': pd.Timestamp(data['timestamp'], unit='ms') } # Update tick storage with proper tick object self.tick_storage.add_tick(tick) # Store latest values self.latest_price = data['price'] self.latest_volume = data.get('volume', 0) self.latest_timestamp = datetime.fromtimestamp(data['timestamp'] / 1000) # Force chart update every 5 seconds current_time = time.time() if current_time - last_update_time >= 5.0: self._update_chart_and_positions() last_update_time = current_time logger.debug("Forced chart update after new ticks") # Log tick processing for debugging (every 100 ticks) if tick_count % 100 == 0: logger.info(f"Processed {tick_count} ticks, current price: ${self.latest_price:.2f}") logger.info(f"Current 1s candles count: {len(self.tick_storage.candles['1s'])}") except Exception as e: logger.error(f"Error processing websocket data: {str(e)}") await asyncio.sleep(1) # Wait before retrying except Exception as e: logger.error(f"WebSocket error for {self.symbol}: {str(e)}") import traceback logger.error(traceback.format_exc()) finally: if hasattr(self, 'websocket'): await self.websocket.close() def run(self, host='localhost', port=8050): """Run the Dash app on the specified host and port""" try: logger.info("="*60) logger.info(f"🚀 STARTING WEB UI FOR {self.symbol}") logger.info(f"📱 Web interface available at: http://{host}:{port}/") logger.info(f"🌐 Open this URL in your browser to view the trading chart") logger.info("="*60) self.app.run(debug=False, use_reloader=False, host=host, port=port) except Exception as e: logger.error(f"Error running Dash app: {str(e)}") import traceback logger.error(traceback.format_exc()) def add_trade(self, action, price, amount, timestamp): """ Adds a trade to the chart's positions and tracks the trade time. Handles closing previous open positions if necessary. """ try: trade_timestamp = datetime.fromtimestamp(timestamp / 1000) if isinstance(timestamp, (int, float)) else timestamp trade_timestamp = trade_timestamp or datetime.now() # Close the previous open position if this trade is opposite last_open_position = next((pos for pos in reversed(self.positions) if pos.is_open), None) if last_open_position: if (action == 'SELL' and last_open_position.action == 'BUY') or \ (action == 'BUY' and last_open_position.action == 'SELL'): closed_pnl = last_open_position.close(price, trade_timestamp) self.accumulative_pnl += closed_pnl self.current_balance += closed_pnl # Simplified balance update logger.info(f"Closed {last_open_position.action} position {last_open_position.trade_id} at {price:.2f}. PnL: {closed_pnl:.4f}") # Create and add the new position new_position = Position( action=action, entry_price=price, amount=amount, timestamp=trade_timestamp ) self.positions.append(new_position) self.trade_times.append(datetime.now()) # Use current time for rate calculation accuracy logger.info(f"Added new {action} position {new_position.trade_id} at {price:.2f}, Time: {trade_timestamp}") # Limit the number of stored positions and trade times to prevent memory issues max_history = 1000 if len(self.positions) > max_history: self.positions = self.positions[-max_history:] if len(self.trade_times) > max_history * 5: # Keep more trade times for rate calc self.trade_times = self.trade_times[-max_history*5:] except Exception as e: logger.error(f"Error adding trade to chart: {e}") import traceback logger.error(traceback.format_exc()) def add_nn_signal(self, symbol, signal, confidence, timestamp): # Placeholder for adding NN signals if needed in the future pass def _update_main_chart(self, interval_seconds): """Update the main chart for the specified interval""" try: # Convert interval seconds to timeframe key interval_key = self._get_interval_key(interval_seconds) # Get candles for this timeframe if interval_key not in self.tick_storage.candles or not self.tick_storage.candles[interval_key]: logger.warning(f"No candle data available for {interval_key}") # Return empty figure with a message fig = go.Figure() fig.add_annotation( text=f"No data available for {interval_key}", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20, color="white") ) fig.update_layout( template="plotly_dark", height=600, title=f"{self.symbol} - {interval_key} Chart", xaxis_title="Time", yaxis_title="Price ($)" ) return fig # Get candles (limit to last 500 for performance) candles = self.tick_storage.candles[interval_key][-500:] if not candles: # Return empty figure if no candles fig = go.Figure() fig.add_annotation( text=f"No candles available for {interval_key}", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=20, color="white") ) fig.update_layout( template="plotly_dark", height=600, title=f"{self.symbol} - {interval_key} Chart" ) return fig # Extract OHLC values timestamps = [candle['timestamp'] for candle in candles] opens = [candle['open'] for candle in candles] highs = [candle['high'] for candle in candles] lows = [candle['low'] for candle in candles] closes = [candle['close'] for candle in candles] volumes = [candle['volume'] for candle in candles] # Create the main figure fig = go.Figure() # Add candlestick trace fig.add_trace(go.Candlestick( x=timestamps, open=opens, high=highs, low=lows, close=closes, name=f"{self.symbol}", increasing_line_color='rgba(0, 200, 0, 0.8)', decreasing_line_color='rgba(255, 0, 0, 0.8)', increasing_fillcolor='rgba(0, 200, 0, 0.3)', decreasing_fillcolor='rgba(255, 0, 0, 0.3)' )) # Add trade markers if we have positions if hasattr(self, 'positions') and self.positions: # Get recent positions (last 50 to avoid clutter) recent_positions = self.positions[-50:] if len(self.positions) > 50 else self.positions for position in recent_positions: # Add entry marker fig.add_trace(go.Scatter( x=[position.entry_timestamp], y=[position.entry_price], mode='markers', marker=dict( symbol='triangle-up' if position.action == 'BUY' else 'triangle-down', size=12, color='green' if position.action == 'BUY' else 'red', line=dict(width=2, color='white') ), name=f"{position.action} Entry", hovertemplate=f"{position.action} Entry
" + f"Price: ${position.entry_price:.2f}
" + f"Time: {position.entry_timestamp}
" + f"ID: {position.trade_id}", showlegend=False )) # Add exit marker if position is closed if not position.is_open and position.exit_price and position.exit_timestamp: fig.add_trace(go.Scatter( x=[position.exit_timestamp], y=[position.exit_price], mode='markers', marker=dict( symbol='triangle-down' if position.action == 'BUY' else 'triangle-up', size=12, color='blue', line=dict(width=2, color='white') ), name=f"{position.action} Exit", hovertemplate=f"{position.action} Exit
" + f"Price: ${position.exit_price:.2f}
" + f"Time: {position.exit_timestamp}
" + f"PnL: ${position.pnl:.2f}
" + f"ID: {position.trade_id}", showlegend=False )) # Update layout fig.update_layout( template="plotly_dark", height=600, title=f"{self.symbol} - {interval_key} Chart (Live Trading)", xaxis_title="Time", yaxis_title="Price ($)", showlegend=True, margin=dict(l=0, r=0, t=40, b=0), xaxis_rangeslider_visible=False, hovermode='x unified' ) # Format Y-axis with appropriate decimal places fig.update_yaxes(tickformat=".2f") # Format X-axis fig.update_xaxes( rangeslider_visible=False, type='date' ) return fig except Exception as e: logger.error(f"Error updating main chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return error figure fig = go.Figure() fig.add_annotation( text=f"Error loading chart: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="red") ) fig.update_layout( template="plotly_dark", height=600, title=f"{self.symbol} Chart - Error" ) return fig class BinanceWebSocket: """Binance WebSocket implementation for real-time tick data""" def __init__(self, symbol: str): self.symbol = symbol.replace('/', '').lower() self.ws = None self.running = False self.reconnect_delay = 1 self.max_reconnect_delay = 60 self.message_count = 0 # Binance WebSocket configuration self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade" logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}") async def connect(self): while True: try: logger.info(f"Attempting to connect to {self.ws_url}") self.ws = await websockets.connect(self.ws_url) logger.info("WebSocket connection established") self.running = True self.reconnect_delay = 1 logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}") return True except Exception as e: logger.error(f"WebSocket connection error: {str(e)}") await asyncio.sleep(self.reconnect_delay) self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) continue async def receive(self) -> Optional[Dict]: if not self.ws: return None try: message = await self.ws.recv() self.message_count += 1 if self.message_count % 100 == 0: # Log every 100th message to avoid spam logger.info(f"Received message #{self.message_count}") logger.debug(f"Raw message: {message[:200]}...") data = json.loads(message) # Process trade data if 'e' in data and data['e'] == 'trade': trade_data = { 'timestamp': data['T'], # Trade time 'price': float(data['p']), # Price 'volume': float(data['q']), # Quantity 'type': 'trade' } logger.debug(f"Processed trade data: {trade_data}") return trade_data return None except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket connection closed") self.running = False return None except json.JSONDecodeError as e: logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...") return None except Exception as e: logger.error(f"Error receiving message: {str(e)}") return None async def close(self): """Close the WebSocket connection""" if self.ws: await self.ws.close() class ExchangeWebSocket: """Generic WebSocket interface for cryptocurrency exchanges""" def __init__(self, symbol: str, exchange: str = "binance"): self.symbol = symbol self.exchange = exchange.lower() self.ws = None # Initialize the appropriate WebSocket implementation if self.exchange == "binance": self.ws = BinanceWebSocket(symbol) else: raise ValueError(f"Unsupported exchange: {exchange}") async def connect(self): """Connect to the exchange WebSocket""" return await self.ws.connect() async def receive(self) -> Optional[Dict]: """Receive data from the WebSocket""" return await self.ws.receive() async def close(self): """Close the WebSocket connection""" await self.ws.close() @property def running(self): """Check if the WebSocket is running""" return self.ws.running if self.ws else False async def main(): global charts # Make charts globally accessible for NN integration symbols = ["ETH/USDT"] # Only use one symbol to simplify debugging logger.info(f"Starting application for symbols: {symbols}") # Initialize neural network if enabled if NN_ENABLED: logger.info("Initializing Neural Network integration...") if setup_neural_network(): logger.info("Neural Network integration initialized successfully") else: logger.warning("Neural Network integration failed to initialize") charts = [] websocket_tasks = [] # Create a chart and websocket task for each symbol for symbol in symbols: try: # Create a proper Dash app for each chart app = dash.Dash(__name__, external_stylesheets=[dbc.themes.DARKLY], suppress_callback_exceptions=True) # Initialize the chart with the app chart = RealTimeChart( app=app, symbol=symbol, timeframe='1m', standalone=True, chart_title=f"{symbol} Realtime Trading Chart", debug_mode=True, port=8050, height=800, width=1200 ) charts.append(chart) # Start the chart's websocket in a separate task websocket_task = asyncio.create_task(chart.start_websocket()) websocket_tasks.append(websocket_task) # Run the Dash app in a separate thread port = 8050 + len(charts) - 1 # Use different ports for each chart logger.info(f"Starting chart for {chart.symbol} on port {port}") thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) thread.daemon = True thread.start() logger.info(f"Thread started for {chart.symbol} on port {port}") except Exception as e: logger.error(f"Error initializing chart for {symbol}: {str(e)}") import traceback logger.error(traceback.format_exc()) try: # Keep the main task running while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("Shutting down...") except Exception as e: logger.error(f"Unexpected error: {str(e)}") finally: for task in websocket_tasks: task.cancel() try: await task except asyncio.CancelledError: pass if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.info("Application terminated by user")