import asyncio import json import logging # Fix PIL import issue that causes plotly JSON serialization errors import os os.environ['MPLBACKEND'] = 'Agg' # Use non-interactive backend try: # Try to fix PIL import issue import PIL.Image # Disable PIL in plotly to prevent circular import issues import plotly.io as pio pio.kaleido.scope.default_format = "png" except ImportError: pass except Exception: # Suppress any PIL-related errors during import pass from typing import Dict, List, Optional, Tuple, Union import websockets import plotly.graph_objects as go from plotly.subplots import make_subplots import dash from dash import html, dcc from dash.dependencies import Input, Output import pandas as pd import numpy as np from collections import deque import time from threading import Thread import requests import os from datetime import datetime, timedelta import pytz import tzlocal import threading import random import dash_bootstrap_components as dbc import uuid import ta from sklearn.preprocessing import MinMaxScaler import re import psutil import gc import websocket # Import psycopg2 with error handling try: import psycopg2 PSYCOPG2_AVAILABLE = True except ImportError: PSYCOPG2_AVAILABLE = False psycopg2 = None # TimescaleDB configuration from environment variables TIMESCALEDB_ENABLED = os.environ.get('TIMESCALEDB_ENABLED', '1') == '1' and PSYCOPG2_AVAILABLE TIMESCALEDB_HOST = os.environ.get('TIMESCALEDB_HOST', '192.168.0.10') TIMESCALEDB_PORT = int(os.environ.get('TIMESCALEDB_PORT', '5432')) TIMESCALEDB_USER = os.environ.get('TIMESCALEDB_USER', 'postgres') TIMESCALEDB_PASSWORD = os.environ.get('TIMESCALEDB_PASSWORD', 'timescaledbpass') TIMESCALEDB_DB = os.environ.get('TIMESCALEDB_DB', 'candles') class TimescaleDBHandler: """Handler for TimescaleDB operations for candle storage and retrieval""" def __init__(self): """Initialize TimescaleDB connection if enabled""" self.enabled = TIMESCALEDB_ENABLED self.conn = None if not self.enabled: if not PSYCOPG2_AVAILABLE: print("psycopg2 module not available. TimescaleDB integration disabled.") return try: # Connect to TimescaleDB self.conn = psycopg2.connect( host=TIMESCALEDB_HOST, port=TIMESCALEDB_PORT, user=TIMESCALEDB_USER, password=TIMESCALEDB_PASSWORD, dbname=TIMESCALEDB_DB ) print(f"Connected to TimescaleDB at {TIMESCALEDB_HOST}:{TIMESCALEDB_PORT}") # Ensure the candles table exists self._ensure_table() print("TimescaleDB integration initialized successfully") except Exception as e: print(f"Error connecting to TimescaleDB: {str(e)}") self.enabled = False self.conn = None def _ensure_table(self): """Ensure the candles table exists with TimescaleDB hypertable""" if not self.conn: return try: with self.conn.cursor() as cur: # Create the candles table if it doesn't exist cur.execute(''' CREATE TABLE IF NOT EXISTS candles ( symbol TEXT, interval TEXT, timestamp TIMESTAMPTZ, open DOUBLE PRECISION, high DOUBLE PRECISION, low DOUBLE PRECISION, close DOUBLE PRECISION, volume DOUBLE PRECISION, PRIMARY KEY (symbol, interval, timestamp) ); ''') # Check if the table is already a hypertable cur.execute(''' SELECT EXISTS ( SELECT 1 FROM timescaledb_information.hypertables WHERE hypertable_name = 'candles' ); ''') is_hypertable = cur.fetchone()[0] # Convert to hypertable if not already done if not is_hypertable: cur.execute(''' SELECT create_hypertable('candles', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE ); ''') self.conn.commit() print("TimescaleDB table structure verified") except Exception as e: print(f"Error setting up TimescaleDB tables: {str(e)}") self.enabled = False def upsert_candle(self, symbol, interval, candle): """Insert or update a candle in TimescaleDB""" if not self.enabled or not self.conn: return False try: with self.conn.cursor() as cur: cur.execute(''' INSERT INTO candles ( symbol, interval, timestamp, open, high, low, close, volume ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (symbol, interval, timestamp) DO UPDATE SET open = EXCLUDED.open, high = EXCLUDED.high, low = EXCLUDED.low, close = EXCLUDED.close, volume = EXCLUDED.volume ''', ( symbol, interval, candle['timestamp'], candle['open'], candle['high'], candle['low'], candle['close'], candle['volume'] )) self.conn.commit() return True except Exception as e: print(f"Error upserting candle to TimescaleDB: {str(e)}") # Try to reconnect on error try: self.conn = psycopg2.connect( host=TIMESCALEDB_HOST, port=TIMESCALEDB_PORT, user=TIMESCALEDB_USER, password=TIMESCALEDB_PASSWORD, dbname=TIMESCALEDB_DB ) except: pass return False def fetch_candles(self, symbol, interval, limit=1000): """Fetch candles from TimescaleDB""" if not self.enabled or not self.conn: return [] try: with self.conn.cursor() as cur: cur.execute(''' SELECT timestamp, open, high, low, close, volume FROM candles WHERE symbol = %s AND interval = %s ORDER BY timestamp DESC LIMIT %s ''', (symbol, interval, limit)) rows = cur.fetchall() # Convert to list of dictionaries (ordered from oldest to newest) candles = [] for row in reversed(rows): # Reverse to get oldest first candle = { 'timestamp': row[0], 'open': row[1], 'high': row[2], 'low': row[3], 'close': row[4], 'volume': row[5] } candles.append(candle) return candles except Exception as e: print(f"Error fetching candles from TimescaleDB: {str(e)}") # Try to reconnect on error try: self.conn = psycopg2.connect( host=TIMESCALEDB_HOST, port=TIMESCALEDB_PORT, user=TIMESCALEDB_USER, password=TIMESCALEDB_PASSWORD, dbname=TIMESCALEDB_DB ) except: pass return [] class BinanceHistoricalData: """ Class for fetching historical price data from Binance. """ def __init__(self): self.base_url = "https://api.binance.com/api/v3" self.cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache') if not os.path.exists(self.cache_dir): os.makedirs(self.cache_dir) # Timestamp of last data update self.last_update = None def get_historical_candles(self, symbol, interval_seconds=3600, limit=1000): """ Fetch historical candles from Binance API. Args: symbol (str): Trading pair symbol (e.g., "BTC/USDT") interval_seconds (int): Timeframe in seconds (e.g., 3600 for 1h) limit (int): Number of candles to fetch Returns: pd.DataFrame: DataFrame with OHLCV data """ # Convert interval_seconds to Binance interval format interval_map = { 1: "1s", 60: "1m", 300: "5m", 900: "15m", 1800: "30m", 3600: "1h", 14400: "4h", 86400: "1d" } interval = interval_map.get(interval_seconds, "1h") # Format symbol for Binance API (remove slash and make uppercase) formatted_symbol = symbol.replace("/", "").upper() # Check if we have cached data first cache_file = self._get_cache_filename(formatted_symbol, interval) cached_data = self._load_from_cache(formatted_symbol, interval) # If we have cached data that's recent enough, use it if cached_data is not None and len(cached_data) >= limit: cache_age_minutes = (datetime.now() - self.last_update).total_seconds() / 60 if self.last_update else 60 if cache_age_minutes < 15: # Only use cache if it's less than 15 minutes old logger.info(f"Using cached historical data for {symbol} ({interval})") return cached_data try: # Build URL for klines endpoint url = f"{self.base_url}/klines" params = { "symbol": formatted_symbol, "interval": interval, "limit": limit } # Make the request response = requests.get(url, params=params) response.raise_for_status() # Parse the response data = response.json() # Create dataframe df = pd.DataFrame(data, columns=[ "timestamp", "open", "high", "low", "close", "volume", "close_time", "quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" ]) # Convert timestamp to datetime df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") # Convert price columns to float for col in ["open", "high", "low", "close", "volume"]: df[col] = df[col].astype(float) # Sort by timestamp df = df.sort_values("timestamp") # Save to cache for future use self._save_to_cache(df, formatted_symbol, interval) self.last_update = datetime.now() logger.info(f"Fetched {len(df)} candles for {symbol} ({interval})") return df except Exception as e: logger.error(f"Error fetching historical data from Binance: {str(e)}") # Return cached data if we have it, even if it's not enough if cached_data is not None: logger.warning(f"Using cached data instead (may be incomplete)") return cached_data # Return empty dataframe on error return pd.DataFrame() def _get_cache_filename(self, symbol, interval): """Get filename for cache file""" return os.path.join(self.cache_dir, f"{symbol}_{interval}_candles.csv") def _load_from_cache(self, symbol, interval): """Load candles from cache file""" try: cache_file = self._get_cache_filename(symbol, interval) if os.path.exists(cache_file): # For 1s interval, check if the cache is recent (less than 10 minutes old) if interval == "1s" or interval == 1: file_mod_time = datetime.fromtimestamp(os.path.getmtime(cache_file)) time_diff = (datetime.now() - file_mod_time).total_seconds() / 60 if time_diff > 10: logger.info("1s cache is older than 10 minutes, skipping load") return None logger.info(f"Using recent 1s cache (age: {time_diff:.1f} minutes)") df = pd.read_csv(cache_file) df["timestamp"] = pd.to_datetime(df["timestamp"]) logger.info(f"Loaded {len(df)} candles from cache: {cache_file}") return df except Exception as e: logger.error(f"Error loading cached data: {str(e)}") return None def _save_to_cache(self, df, symbol, interval): """Save candles to cache file""" try: cache_file = self._get_cache_filename(symbol, interval) df.to_csv(cache_file, index=False) logger.info(f"Saved {len(df)} candles to cache: {cache_file}") return True except Exception as e: logger.error(f"Error saving to cache: {str(e)}") return False def get_recent_trades(self, symbol, limit=1000): """Get recent trades for a symbol""" formatted_symbol = symbol.replace("/", "") try: url = f"{self.base_url}/trades" params = { "symbol": formatted_symbol, "limit": limit } response = requests.get(url, params=params) response.raise_for_status() data = response.json() # Create dataframe df = pd.DataFrame(data) df["time"] = pd.to_datetime(df["time"], unit="ms") df["price"] = df["price"].astype(float) df["qty"] = df["qty"].astype(float) return df except Exception as e: logger.error(f"Error fetching recent trades: {str(e)}") return pd.DataFrame() class MultiTimeframeDataInterface: """ Enhanced Data Interface supporting: - Multiple trading pairs - Multiple timeframes per pair (1s, 1m, 1h, 1d + custom) - Technical indicators - Cross-timeframe normalization - Real-time data updates """ def __init__(self, symbol=None, timeframes=None, data_dir="data"): """ Initialize the data interface. Args: symbol (str): Trading pair symbol (e.g., "BTC/USDT") timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d']) data_dir (str): Directory to store/load datasets """ self.symbol = symbol self.timeframes = timeframes or ['1h', '4h', '1d'] self.data_dir = data_dir self.scalers = {} # Store scalers for each timeframe # Initialize the historical data fetcher self.historical_data = BinanceHistoricalData() # Create data directory if it doesn't exist os.makedirs(self.data_dir, exist_ok=True) # Initialize empty dataframes for each timeframe self.dataframes = {tf: None for tf in self.timeframes} # Store timestamps of last updates per timeframe self.last_updates = {tf: None for tf in self.timeframes} # Timeframe mapping (string to seconds) self.timeframe_to_seconds = { '1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800, '1h': 3600, '4h': 14400, '1d': 86400 } logger.info(f"MultiTimeframeDataInterface initialized for {symbol} with timeframes {timeframes}") def get_data(self, timeframe='1h', n_candles=1000, refresh=False, add_indicators=True): """ Fetch historical price data for a given timeframe with optional indicators. Args: timeframe (str): Timeframe to fetch data for n_candles (int): Number of candles to fetch refresh (bool): Force refresh of the data add_indicators (bool): Whether to add technical indicators Returns: pd.DataFrame: DataFrame with OHLCV data and indicators """ # Check if we need to refresh current_time = datetime.now() if (not refresh and self.dataframes[timeframe] is not None and self.last_updates[timeframe] is not None and (current_time - self.last_updates[timeframe]).total_seconds() < 60): logger.info(f"Using cached data for {self.symbol} {timeframe}") return self.dataframes[timeframe] interval_seconds = self.timeframe_to_seconds.get(timeframe, 3600) # Fetch data df = self.historical_data.get_historical_candles( symbol=self.symbol, interval_seconds=interval_seconds, limit=n_candles ) if df is None or df.empty: logger.error(f"No data available for {self.symbol} {timeframe}") return None # Add indicators if requested if add_indicators: df = self.add_indicators(df) # Store in cache self.dataframes[timeframe] = df self.last_updates[timeframe] = current_time logger.info(f"Fetched and processed {len(df)} candles for {self.symbol} {timeframe}") return df def add_indicators(self, df): """ Add comprehensive technical indicators to the dataframe. Args: df (pd.DataFrame): DataFrame with OHLCV data Returns: pd.DataFrame: DataFrame with added technical indicators """ # Make a copy to avoid modifying the original df_copy = df.copy() # Basic price indicators df_copy['returns'] = df_copy['close'].pct_change() df_copy['log_returns'] = np.log(df_copy['close'] / df_copy['close'].shift(1)) # Moving Averages df_copy['sma_7'] = ta.trend.sma_indicator(df_copy['close'], window=7) df_copy['sma_25'] = ta.trend.sma_indicator(df_copy['close'], window=25) df_copy['sma_99'] = ta.trend.sma_indicator(df_copy['close'], window=99) df_copy['ema_9'] = ta.trend.ema_indicator(df_copy['close'], window=9) df_copy['ema_21'] = ta.trend.ema_indicator(df_copy['close'], window=21) # MACD macd = ta.trend.MACD(df_copy['close']) df_copy['macd'] = macd.macd() df_copy['macd_signal'] = macd.macd_signal() df_copy['macd_diff'] = macd.macd_diff() # RSI df_copy['rsi'] = ta.momentum.rsi(df_copy['close'], window=14) # Bollinger Bands bollinger = ta.volatility.BollingerBands(df_copy['close']) df_copy['bb_high'] = bollinger.bollinger_hband() df_copy['bb_low'] = bollinger.bollinger_lband() df_copy['bb_pct'] = bollinger.bollinger_pband() # Stochastic Oscillator stoch = ta.momentum.StochasticOscillator(df_copy['high'], df_copy['low'], df_copy['close']) df_copy['stoch_k'] = stoch.stoch() df_copy['stoch_d'] = stoch.stoch_signal() # ATR - Average True Range df_copy['atr'] = ta.volatility.average_true_range(df_copy['high'], df_copy['low'], df_copy['close'], window=14) # Money Flow Index df_copy['mfi'] = ta.volume.money_flow_index(df_copy['high'], df_copy['low'], df_copy['close'], df_copy['volume'], window=14) # OBV - On-Balance Volume df_copy['obv'] = ta.volume.on_balance_volume(df_copy['close'], df_copy['volume']) # Ichimoku Cloud ichimoku = ta.trend.IchimokuIndicator(df_copy['high'], df_copy['low']) df_copy['ichimoku_a'] = ichimoku.ichimoku_a() df_copy['ichimoku_b'] = ichimoku.ichimoku_b() df_copy['ichimoku_base'] = ichimoku.ichimoku_base_line() df_copy['ichimoku_conv'] = ichimoku.ichimoku_conversion_line() # ADX - Average Directional Index adx = ta.trend.ADXIndicator(df_copy['high'], df_copy['low'], df_copy['close']) df_copy['adx'] = adx.adx() df_copy['adx_pos'] = adx.adx_pos() df_copy['adx_neg'] = adx.adx_neg() # VWAP - Volume Weighted Average Price (intraday) # Custom calculation since TA library doesn't include VWAP df_copy['vwap'] = (df_copy['volume'] * (df_copy['high'] + df_copy['low'] + df_copy['close']) / 3).cumsum() / df_copy['volume'].cumsum() # Fill NaN values df_copy = df_copy.fillna(method='bfill').fillna(0) return df_copy def get_multi_timeframe_data(self, timeframes=None, n_candles=1000, refresh=False, add_indicators=True): """ Fetch data for multiple timeframes. Args: timeframes (list): List of timeframes to fetch n_candles (int): Number of candles to fetch for each timeframe refresh (bool): Force refresh of the data add_indicators (bool): Whether to add technical indicators Returns: dict: Dictionary of dataframes indexed by timeframe """ if timeframes is None: timeframes = self.timeframes result = {} for tf in timeframes: # For higher timeframes, we need fewer candles tf_candles = n_candles if tf == '4h': tf_candles = max(250, n_candles // 4) elif tf == '1d': tf_candles = max(100, n_candles // 24) df = self.get_data(timeframe=tf, n_candles=tf_candles, refresh=refresh, add_indicators=add_indicators) if df is not None and not df.empty: result[tf] = df return result def prepare_training_data(self, window_size=20, train_ratio=0.8, refresh=False): """ Prepare training data from multiple timeframes. Args: window_size (int): Size of the sliding window train_ratio (float): Ratio of data to use for training refresh (bool): Whether to refresh the data Returns: tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices) """ # Get data for all timeframes data_dict = self.get_multi_timeframe_data(refresh=refresh) if not data_dict: logger.error("Failed to fetch data for any timeframe") return None, None, None, None, None, None # Align all dataframes by timestamp all_dfs = list(data_dict.values()) min_date = max([df['timestamp'].min() for df in all_dfs]) max_date = min([df['timestamp'].max() for df in all_dfs]) aligned_dfs = {} for tf, df in data_dict.items(): aligned_df = df[(df['timestamp'] >= min_date) & (df['timestamp'] <= max_date)] aligned_dfs[tf] = aligned_df # Choose the lowest timeframe as the reference for time alignment reference_tf = min(self.timeframes, key=lambda x: self.timeframe_to_seconds.get(x, 3600)) reference_df = aligned_dfs[reference_tf] # Create sliding windows for each timeframe X_dict = {} for tf, df in aligned_dfs.items(): # Drop timestamp and create numeric features features = df.drop('timestamp', axis=1).values # Ensure the feature array is 3D: [samples, window, features] X = np.array([features[i:i+window_size] for i in range(len(features)-window_size)]) X_dict[tf] = X # Create target labels based on future price movements reference_prices = reference_df['close'].values future_prices = reference_prices[window_size:] current_prices = reference_prices[window_size-1:-1] # Calculate returns returns = (future_prices - current_prices) / current_prices # Create labels: 0=SELL, 1=HOLD, 2=BUY threshold = 0.0005 # 0.05% threshold y = np.zeros(len(returns), dtype=int) y[returns > threshold] = 2 # BUY y[returns < -threshold] = 0 # SELL y[(returns >= -threshold) & (returns <= threshold)] = 1 # HOLD # Split into training and validation sets split_idx = int(len(y) * train_ratio) X_train_dict = {tf: X[:split_idx] for tf, X in X_dict.items()} X_val_dict = {tf: X[split_idx:] for tf, X in X_dict.items()} y_train = y[:split_idx] y_val = y[split_idx:] train_prices = reference_prices[window_size-1:window_size-1+split_idx] val_prices = reference_prices[window_size-1+split_idx:window_size-1+len(y)] logger.info(f"Prepared training data - Train: {len(y_train)}, Val: {len(y_val)}") return X_train_dict, y_train, X_val_dict, y_val, train_prices, val_prices def normalize_data(self, data_dict, fit=True): """ Normalize data across all timeframes. Args: data_dict (dict): Dictionary of data arrays by timeframe fit (bool): Whether to fit new scalers or use existing ones Returns: dict: Dictionary of normalized data arrays """ result = {} for tf, data in data_dict.items(): # For 3D data [samples, window, features] if len(data.shape) == 3: samples, window, features = data.shape reshaped = data.reshape(-1, features) if fit or tf not in self.scalers: self.scalers[tf] = MinMaxScaler() normalized = self.scalers[tf].fit_transform(reshaped) else: normalized = self.scalers[tf].transform(reshaped) result[tf] = normalized.reshape(samples, window, features) # For 2D data [samples, features] elif len(data.shape) == 2: if fit or tf not in self.scalers: self.scalers[tf] = MinMaxScaler() result[tf] = self.scalers[tf].fit_transform(data) else: result[tf] = self.scalers[tf].transform(data) return result def get_realtime_features(self, timeframes=None, window_size=20): """ Get the most recent data for real-time prediction. Args: timeframes (list): List of timeframes to use window_size (int): Size of the sliding window Returns: dict: Dictionary of feature arrays for the latest window """ if timeframes is None: timeframes = self.timeframes # Get fresh data data_dict = self.get_multi_timeframe_data(timeframes=timeframes, refresh=True) result = {} for tf, df in data_dict.items(): if len(df) < window_size: logger.warning(f"Not enough data for {tf} (need {window_size}, got {len(df)})") continue # Get the latest window latest_data = df.tail(window_size).drop('timestamp', axis=1).values # Add extra dimension to match model input shape [1, window_size, features] result[tf] = latest_data.reshape(1, window_size, -1) # Apply normalization using existing scalers if self.scalers: result = self.normalize_data(result, fit=False) return result def calculate_pnl(self, predictions, prices, position_size=1.0, fee_rate=0.0002): """ Calculate PnL and win rate from predictions. Args: predictions (np.ndarray): Array of predicted actions (0=SELL, 1=HOLD, 2=BUY) prices (np.ndarray): Array of prices position_size (float): Size of each position fee_rate (float): Trading fee rate (default: 0.0002 for 0.02% per trade) Returns: tuple: (total_pnl, win_rate, trades) """ if len(predictions) < 2 or len(prices) < 2: return 0.0, 0.0, [] # Ensure arrays are the same length min_len = min(len(predictions), len(prices)-1) actions = predictions[:min_len] pnl = 0.0 wins = 0 trades = [] for i in range(min_len): current_price = prices[i] next_price = prices[i+1] action = actions[i] # Skip HOLD actions if action == 1: continue price_change = (next_price - current_price) / current_price if action == 2: # BUY # Calculate raw PnL raw_pnl = price_change * position_size # Calculate fees (entry and exit) entry_fee = position_size * fee_rate exit_fee = position_size * (1 + price_change) * fee_rate total_fees = entry_fee + exit_fee # Net PnL after fees trade_pnl = raw_pnl - total_fees trade_type = 'BUY' is_win = trade_pnl > 0 elif action == 0: # SELL # Calculate raw PnL raw_pnl = -price_change * position_size # Calculate fees (entry and exit) entry_fee = position_size * fee_rate exit_fee = position_size * (1 - price_change) * fee_rate total_fees = entry_fee + exit_fee # Net PnL after fees trade_pnl = raw_pnl - total_fees trade_type = 'SELL' is_win = trade_pnl > 0 else: continue pnl += trade_pnl wins += int(is_win) trades.append({ 'type': trade_type, 'entry': float(current_price), # Ensure serializable 'exit': float(next_price), 'raw_pnl': float(raw_pnl), 'fees': float(total_fees), 'pnl': float(trade_pnl), 'win': bool(is_win), 'timestamp': datetime.now().isoformat() # Add timestamp }) win_rate = wins / len(trades) if trades else 0.0 return float(pnl), float(win_rate), trades # Configure logging with more detailed format logging.basicConfig( level=logging.INFO, # Changed to DEBUG for more detailed logs format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('realtime_chart.log') ] ) logger = logging.getLogger(__name__) # Neural Network integration (conditional import) NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1' nn_orchestrator = None nn_inference_thread = None if NN_ENABLED: try: import sys # Add project root to sys.path if needed project_root = os.path.dirname(os.path.abspath(__file__)) if project_root not in sys.path: sys.path.append(project_root) from NN.main import NeuralNetworkOrchestrator logger.info("Neural Network module enabled") except ImportError as e: logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}") NN_ENABLED = False # NN utility functions def setup_neural_network(): """Initialize the neural network components if enabled""" global nn_orchestrator, NN_ENABLED if not NN_ENABLED: return False try: # Get configuration from environment variables or use defaults symbol = os.environ.get('NN_SYMBOL', 'ETH/USDT') timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',') output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL # Configure the orchestrator config = { 'symbol': symbol, 'timeframes': timeframes, 'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')), 'n_features': 5, # OHLCV 'output_size': output_size, 'model_dir': 'NN/models/saved', 'data_dir': 'NN/data' } # Initialize the orchestrator logger.info(f"Initializing Neural Network Orchestrator with config: {config}") nn_orchestrator = NeuralNetworkOrchestrator(config) # Load the model model_loaded = nn_orchestrator.load_model() if not model_loaded: logger.warning("Failed to load neural network model. Using untrained model.") return model_loaded except Exception as e: logger.error(f"Error setting up neural network: {str(e)}") NN_ENABLED = False return False def start_nn_inference_thread(interval_seconds): """Start a background thread to periodically run inference with the neural network""" global nn_inference_thread if not NN_ENABLED or nn_orchestrator is None: logger.warning("Cannot start inference thread - Neural Network not enabled or initialized") return False def inference_worker(): """Worker function for the inference thread""" model_type = os.environ.get('NN_MODEL_TYPE', 'cnn') timeframe = os.environ.get('NN_TIMEFRAME', '1h') logger.info(f"Starting neural network inference thread with {interval_seconds}s interval") logger.info(f"Using model type: {model_type}, timeframe: {timeframe}") # Wait a bit for charts to initialize time.sleep(5) # Track active charts active_charts = [] while True: try: # Find active charts if we don't have them yet if not active_charts and 'charts' in globals(): active_charts = globals()['charts'] logger.info(f"Found {len(active_charts)} active charts for NN signals") # Run inference result = nn_orchestrator.run_inference_pipeline( model_type=model_type, timeframe=timeframe ) if result: # Log the result logger.info(f"Neural network inference result: {result}") # Add signal to charts if active_charts: try: if 'action' in result: action = result['action'] timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00')) # Get probability if available probability = None if 'probability' in result: probability = result['probability'] elif 'probabilities' in result: probability = result['probabilities'].get(action, None) # Add signal to each chart for chart in active_charts: if hasattr(chart, 'add_nn_signal'): chart.add_nn_signal(action, timestamp, probability) except Exception as e: logger.error(f"Error adding NN signal to chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Sleep for the interval time.sleep(interval_seconds) except Exception as e: logger.error(f"Error in inference thread: {str(e)}") import traceback logger.error(traceback.format_exc()) time.sleep(5) # Wait a bit before retrying # Create and start the thread nn_inference_thread = threading.Thread(target=inference_worker, daemon=True) nn_inference_thread.start() return True # Try to get local timezone, default to Sofia/EET if not available try: local_timezone = tzlocal.get_localzone() # Get timezone name safely try: tz_name = str(local_timezone) # Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone if hasattr(local_timezone, 'zone'): tz_name = local_timezone.zone elif hasattr(local_timezone, 'key'): tz_name = local_timezone.key else: tz_name = str(local_timezone) except: tz_name = "Local" logger.info(f"Detected local timezone: {local_timezone} ({tz_name})") except Exception as e: logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET") local_timezone = pytz.timezone('Europe/Sofia') tz_name = "Europe/Sofia" def convert_to_local_time(timestamp): """Convert timestamp to local timezone""" try: if isinstance(timestamp, pd.Timestamp): dt = timestamp.to_pydatetime() elif isinstance(timestamp, np.datetime64): dt = pd.Timestamp(timestamp).to_pydatetime() elif isinstance(timestamp, str): dt = pd.to_datetime(timestamp).to_pydatetime() else: dt = timestamp # If datetime is naive (no timezone), assume it's UTC if dt.tzinfo is None: dt = dt.replace(tzinfo=pytz.UTC) # Convert to local timezone local_dt = dt.astimezone(local_timezone) return local_dt except Exception as e: logger.error(f"Error converting timestamp to local time: {str(e)}") return timestamp # Initialize TimescaleDB handler - only once, at module level timescaledb_handler = TimescaleDBHandler() if TIMESCALEDB_ENABLED else None class TickStorage: def __init__(self, symbol, timeframes=None, use_timescaledb=False): """Initialize the tick storage for a specific symbol""" self.symbol = symbol self.timeframes = timeframes or ["1s", "5m", "15m", "1h", "4h", "1d"] self.ticks = [] self.candles = {tf: [] for tf in self.timeframes} self.current_candle = {tf: None for tf in self.timeframes} self.last_candle_timestamp = {tf: None for tf in self.timeframes} self.cache_dir = os.path.join(os.getcwd(), "cache", symbol.replace("/", "")) self.cache_path = os.path.join(self.cache_dir, f"{symbol.replace('/', '')}_ticks.json") # Add missing cache_path self.use_timescaledb = use_timescaledb self.max_ticks = 10000 # Maximum number of ticks to store in memory # Create cache directory if it doesn't exist os.makedirs(self.cache_dir, exist_ok=True) logger.info(f"Creating new tick storage for {symbol} with timeframes {self.timeframes}") logger.info(f"Cache directory: {self.cache_dir}") logger.info(f"Cache file: {self.cache_path}") if use_timescaledb: print(f"TickStorage: TimescaleDB integration is ENABLED for {symbol}") else: logger.info(f"TickStorage: TimescaleDB integration is DISABLED for {symbol}") def _save_to_cache(self): """Save ticks to a cache file""" try: # Only save the latest 5000 ticks to avoid giant files ticks_to_save = self.ticks[-5000:] if len(self.ticks) > 5000 else self.ticks # Convert pandas Timestamps to ISO strings for JSON serialization serializable_ticks = [] for tick in ticks_to_save: serializable_tick = tick.copy() if isinstance(tick['timestamp'], pd.Timestamp): serializable_tick['timestamp'] = tick['timestamp'].isoformat() elif hasattr(tick['timestamp'], 'isoformat'): serializable_tick['timestamp'] = tick['timestamp'].isoformat() else: # Keep as is if it's already a string or number serializable_tick['timestamp'] = tick['timestamp'] serializable_ticks.append(serializable_tick) with open(self.cache_path, 'w') as f: json.dump(serializable_ticks, f) logger.debug(f"Saved {len(serializable_ticks)} ticks to cache") except Exception as e: logger.error(f"Error saving ticks to cache: {e}") def _load_from_cache(self): """Load ticks from cache if available""" if os.path.exists(self.cache_path): try: # Check if the cache file is recent (< 10 minutes old) cache_age = time.time() - os.path.getmtime(self.cache_path) if cache_age > 600: # 10 minutes in seconds logger.warning(f"Cache file is {cache_age:.1f} seconds old (>10 min). Not using it.") return False with open(self.cache_path, 'r') as f: cached_ticks = json.load(f) if cached_ticks: # Convert ISO strings back to pandas Timestamps processed_ticks = [] for tick in cached_ticks: processed_tick = tick.copy() if isinstance(tick['timestamp'], str): try: processed_tick['timestamp'] = pd.Timestamp(tick['timestamp']) except: # If parsing fails, use current time processed_tick['timestamp'] = pd.Timestamp.now() else: # Convert to pandas Timestamp if it's a number (milliseconds) processed_tick['timestamp'] = pd.Timestamp(tick['timestamp'], unit='ms') processed_ticks.append(processed_tick) self.ticks = processed_ticks logger.info(f"Loaded {len(cached_ticks)} ticks from cache") return True except Exception as e: logger.error(f"Error loading ticks from cache: {e}") return False def add_tick(self, tick=None, price=None, volume=None, timestamp=None): """ Add a tick to the storage and update candles for all timeframes Args: tick (dict, optional): A tick object containing price, quantity and timestamp price (float, optional): Price of the tick (used in older interface) volume (float, optional): Volume of the tick (used in older interface) timestamp (datetime, optional): Timestamp of the tick (used in older interface) """ # Handle tick as a dict or separate parameters for backward compatibility if tick is not None and isinstance(tick, dict): # Using the new interface with a tick object price = tick['price'] volume = tick.get('quantity', 0) timestamp = tick['timestamp'] elif price is not None: # Using the old interface with separate parameters # Convert datetime to pd.Timestamp if needed if timestamp is not None and not isinstance(timestamp, pd.Timestamp): timestamp = pd.Timestamp(timestamp) else: logger.error("Invalid tick: must provide either a tick dict or price") return # Ensure timestamp is a pandas Timestamp if not isinstance(timestamp, pd.Timestamp): if isinstance(timestamp, (int, float)): # Assume it's milliseconds timestamp = pd.Timestamp(timestamp, unit='ms') else: # Try to parse as string or datetime timestamp = pd.Timestamp(timestamp) # Create tick object with consistent pandas Timestamp tick_obj = { 'price': float(price), 'quantity': float(volume) if volume is not None else 0.0, 'timestamp': timestamp } # Add to the list of ticks self.ticks.append(tick_obj) # Limit the number of ticks to avoid memory issues if len(self.ticks) > self.max_ticks: self.ticks = self.ticks[-self.max_ticks:] # Update candles for all timeframes for timeframe in self.timeframes: if timeframe == "1s": self._update_1s_candle(tick_obj) else: self._update_candles_for_timeframe(timeframe, tick_obj) # Cache to disk periodically self._try_cache_ticks() def _update_1s_candle(self, tick): """Update the 1-second candle with the new tick""" # Get timestamp for the start of the current second tick_timestamp = tick['timestamp'] candle_timestamp = pd.Timestamp(int(tick_timestamp.timestamp() // 1 * 1_000_000_000)) # Check if we need to create a new candle if self.current_candle["1s"] is None or self.current_candle["1s"]["timestamp"] != candle_timestamp: # If we have a current candle, finalize it and add to candles list if self.current_candle["1s"] is not None: # Add the completed candle to the list self.candles["1s"].append(self.current_candle["1s"]) # Limit the number of stored candles to prevent memory issues if len(self.candles["1s"]) > 3600: # Keep last hour of 1s candles self.candles["1s"] = self.candles["1s"][-3600:] # Store in TimescaleDB if enabled if self.use_timescaledb: timescaledb_handler.upsert_candle( self.symbol, "1s", self.current_candle["1s"] ) # Log completed candle for debugging logger.debug(f"Completed 1s candle: {self.current_candle['1s']['timestamp']} - Close: {self.current_candle['1s']['close']}") # Create a new candle self.current_candle["1s"] = { "timestamp": candle_timestamp, "open": float(tick["price"]), "high": float(tick["price"]), "low": float(tick["price"]), "close": float(tick["price"]), "volume": float(tick["quantity"]) if "quantity" in tick else 0.0 } # Update last candle timestamp self.last_candle_timestamp["1s"] = candle_timestamp logger.debug(f"Created new 1s candle at {candle_timestamp}") else: # Update the current candle current = self.current_candle["1s"] price = float(tick["price"]) # Update high and low if price > current["high"]: current["high"] = price if price < current["low"]: current["low"] = price # Update close price and add volume current["close"] = price current["volume"] += float(tick["quantity"]) if "quantity" in tick else 0.0 def _update_candles_for_timeframe(self, timeframe, tick): """Update candles for a specific timeframe""" # Skip 1s as it's handled separately if timeframe == "1s": return # Convert timeframe to seconds timeframe_seconds = self._timeframe_to_seconds(timeframe) # Get the timestamp truncated to the timeframe interval # e.g., for a 5m candle, the timestamp should be truncated to the nearest 5-minute mark # Convert timestamp to datetime if it's not already tick_timestamp = tick['timestamp'] if isinstance(tick_timestamp, pd.Timestamp): ts = tick_timestamp else: ts = pd.Timestamp(tick_timestamp) # Truncate timestamp to nearest timeframe interval timestamp = pd.Timestamp( int(ts.timestamp() // timeframe_seconds * timeframe_seconds * 1_000_000_000) ) # Get the current candle for this timeframe current_candle = self.current_candle[timeframe] # If we have no current candle or the timestamp is different (new candle) if current_candle is None or current_candle['timestamp'] != timestamp: # If we have a current candle, add it to the candles list if current_candle: self.candles[timeframe].append(current_candle) # Save to TimescaleDB if enabled if self.use_timescaledb: timescaledb_handler.upsert_candle(self.symbol, timeframe, current_candle) # Create a new candle current_candle = { 'timestamp': timestamp, 'open': tick['price'], 'high': tick['price'], 'low': tick['price'], 'close': tick['price'], 'volume': tick.get('quantity', 0) } # Update current candle self.current_candle[timeframe] = current_candle self.last_candle_timestamp[timeframe] = timestamp else: # Update existing candle current_candle['high'] = max(current_candle['high'], tick['price']) current_candle['low'] = min(current_candle['low'], tick['price']) current_candle['close'] = tick['price'] current_candle['volume'] += tick.get('quantity', 0) # Limit the number of candles to avoid memory issues max_candles = 1000 if len(self.candles[timeframe]) > max_candles: self.candles[timeframe] = self.candles[timeframe][-max_candles:] def _timeframe_to_seconds(self, timeframe): """Convert a timeframe string (e.g., '1m', '1h') to seconds""" if timeframe == "1s": return 1 try: # Extract the number and unit match = re.match(r'(\d+)([smhdw])', timeframe) if not match: return None num, unit = match.groups() num = int(num) # Convert to seconds if unit == 's': return num elif unit == 'm': return num * 60 elif unit == 'h': return num * 3600 elif unit == 'd': return num * 86400 elif unit == 'w': return num * 604800 return None except: return None def get_candles(self, timeframe, limit=None): """Get candles for a given timeframe""" if timeframe in self.candles: candles = self.candles[timeframe] # Add the current candle if it exists and isn't None if timeframe in self.current_candle and self.current_candle[timeframe] is not None: # Make a copy of the current candle current_candle_copy = self.current_candle[timeframe].copy() # Check if the current candle is newer than the last candle in the list if not candles or current_candle_copy["timestamp"] > candles[-1]["timestamp"]: candles = candles + [current_candle_copy] # Apply limit if provided if limit and len(candles) > limit: return candles[-limit:] return candles return [] def get_last_price(self): """Get the last known price""" if self.ticks: return float(self.ticks[-1]["price"]) return None def load_historical_data(self, symbol, limit=1000): """Load historical data for all timeframes""" logger.info(f"Starting historical data load for {symbol} with limit {limit}") # Clear existing data self.ticks = [] self.candles = {tf: [] for tf in self.timeframes} self.current_candle = {tf: None for tf in self.timeframes} # Try to load ticks from cache first logger.info("Attempting to load from cache...") cache_loaded = self._load_from_cache() if cache_loaded: logger.info("Successfully loaded data from cache") else: logger.info("No valid cache data found") # Check if we have TimescaleDB enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: logger.info("Attempting to fetch historical data from TimescaleDB") loaded_from_db = False # Load candles for each timeframe from TimescaleDB for tf in self.timeframes: try: candles = timescaledb_handler.fetch_candles(symbol, tf, limit) if candles: self.candles[tf] = candles loaded_from_db = True logger.info(f"Loaded {len(candles)} {tf} candles from TimescaleDB") else: logger.info(f"No {tf} candles found in TimescaleDB") except Exception as e: logger.error(f"Error loading {tf} candles from TimescaleDB: {str(e)}") if loaded_from_db: logger.info("Successfully loaded historical data from TimescaleDB") return True else: logger.info("TimescaleDB not available or disabled") # If no TimescaleDB data and no cache, we need to get from Binance API if not cache_loaded: logger.info("Loading data from Binance API...") # Create a BinanceHistoricalData instance historical_data = BinanceHistoricalData() # Load data for each timeframe success_count = 0 for tf in self.timeframes: if tf != "1s": # Skip 1s since we'll generate it from ticks try: logger.info(f"Fetching {tf} candles for {symbol}...") df = historical_data.get_historical_candles(symbol, self._timeframe_to_seconds(tf), limit) if df is not None and not df.empty: logger.info(f"Loaded {len(df)} {tf} candles from Binance API") # Convert to our candle format and store candles = [] for _, row in df.iterrows(): candle = { 'timestamp': row['timestamp'], 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } candles.append(candle) # Also save to TimescaleDB if enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: timescaledb_handler.upsert_candle(symbol, tf, candle) self.candles[tf] = candles success_count += 1 else: logger.warning(f"No data returned for {tf} candles") except Exception as e: logger.error(f"Error loading {tf} candles: {str(e)}") import traceback logger.error(traceback.format_exc()) logger.info(f"Successfully loaded {success_count} timeframes from Binance API") # For 1s, load from API if possible or compute from first available timeframe if "1s" in self.timeframes: logger.info("Loading 1s candles...") # Try to get 1s data from Binance try: df_1s = historical_data.get_historical_candles(symbol, 1, 300) # Only need recent 1s data if df_1s is not None and not df_1s.empty: logger.info(f"Loaded {len(df_1s)} recent 1s candles from Binance API") # Convert to our candle format and store candles_1s = [] for _, row in df_1s.iterrows(): candle = { 'timestamp': row['timestamp'], 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } candles_1s.append(candle) # Also save to TimescaleDB if enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: timescaledb_handler.upsert_candle(symbol, "1s", candle) self.candles["1s"] = candles_1s except Exception as e: logger.error(f"Error loading 1s candles: {str(e)}") # If 1s data not available or failed to load, approximate from 1m data if not self.candles.get("1s"): logger.info("1s data not available, trying to approximate from 1m data...") # If 1s data not available, we can approximate from 1m data if "1m" in self.timeframes and self.candles["1m"]: # For demonstration, just use the 1m candles as placeholders for 1s # In a real implementation, you might want more sophisticated interpolation logger.info("Using 1m candles as placeholders for 1s timeframe") self.candles["1s"] = [] # Take the most recent 5 minutes of 1m candles recent_1m = self.candles["1m"][-5:] if self.candles["1m"] else [] logger.info(f"Creating 1s approximations from {len(recent_1m)} 1m candles") for candle_1m in recent_1m: # Create 60 1s candles for each 1m candle ts_base = candle_1m["timestamp"].timestamp() for i in range(60): # Create a 1s candle with interpolated values candle_1s = { 'timestamp': pd.Timestamp(int((ts_base + i) * 1_000_000_000)), 'open': candle_1m['open'], 'high': candle_1m['high'], 'low': candle_1m['low'], 'close': candle_1m['close'], 'volume': candle_1m['volume'] / 60.0 # Distribute volume evenly } self.candles["1s"].append(candle_1s) # Also save to TimescaleDB if enabled if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: timescaledb_handler.upsert_candle(symbol, "1s", candle_1s) logger.info(f"Created {len(self.candles['1s'])} approximated 1s candles") else: logger.warning("No 1m data available to approximate 1s candles from") # Set the last candle of each timeframe as the current candle for tf in self.timeframes: if self.candles[tf]: self.current_candle[tf] = self.candles[tf][-1].copy() self.last_candle_timestamp[tf] = self.current_candle[tf]["timestamp"] logger.debug(f"Set current candle for {tf}: {self.current_candle[tf]['timestamp']}") # If we loaded ticks from cache, rebuild candles if cache_loaded: logger.info("Rebuilding candles from cached ticks...") # Clear candles self.candles = {tf: [] for tf in self.timeframes} self.current_candle = {tf: None for tf in self.timeframes} # Process each tick to rebuild the candles for tick in self.ticks: for tf in self.timeframes: if tf == "1s": self._update_1s_candle(tick) else: self._update_candles_for_timeframe(tf, tick) logger.info("Finished rebuilding candles from ticks") # Log final results for tf in self.timeframes: count = len(self.candles[tf]) logger.info(f"Final {tf} candle count: {count}") has_data = cache_loaded or any(self.candles[tf] for tf in self.timeframes) logger.info(f"Historical data loading completed. Has data: {has_data}") return has_data def _try_cache_ticks(self): """Try to save ticks to cache periodically""" # Only save to cache every 100 ticks to avoid excessive disk I/O if len(self.ticks) % 100 == 0: try: self._save_to_cache() except Exception as e: # Don't spam logs with cache errors, just log once every 1000 ticks if len(self.ticks) % 1000 == 0: logger.warning(f"Cache save failed at {len(self.ticks)} ticks: {str(e)}") pass # Continue even if cache fails class Position: """Represents a trading position""" def __init__(self, action, entry_price, amount, timestamp=None, trade_id=None, fee_rate=0.0002): self.action = action self.entry_price = entry_price self.amount = amount self.entry_timestamp = timestamp or datetime.now() self.exit_timestamp = None self.exit_price = None self.pnl = None self.is_open = True self.trade_id = trade_id or str(uuid.uuid4())[:8] self.fee_rate = fee_rate self.paid_fee = entry_price * amount * fee_rate # Calculate entry fee def close(self, exit_price, exit_timestamp=None): """Close an open position""" self.exit_price = exit_price self.exit_timestamp = exit_timestamp or datetime.now() self.is_open = False # Calculate P&L if self.action == "BUY": price_diff = self.exit_price - self.entry_price # Calculate fee for exit trade exit_fee = exit_price * self.amount * self.fee_rate self.paid_fee += exit_fee # Add exit fee to total paid fee self.pnl = (price_diff * self.amount) - self.paid_fee else: # SELL price_diff = self.entry_price - self.exit_price # Calculate fee for exit trade exit_fee = exit_price * self.amount * self.fee_rate self.paid_fee += exit_fee # Add exit fee to total paid fee self.pnl = (price_diff * self.amount) - self.paid_fee return self.pnl class RealTimeChart: def __init__(self, app=None, symbol='BTCUSDT', timeframe='1m', standalone=True, chart_title=None, run_signal_interpreter=False, debug_mode=False, historical_candles=None, extended_hours=False, enable_logging=True, agent=None, trading_env=None, max_memory_usage=90, memory_check_interval=10, tick_update_interval=0.5, chart_update_interval=1, performance_monitoring=False, show_volume=True, show_indicators=True, custom_trades=None, port=8050, height=900, width=1200, positions_callback=None, allow_synthetic_data=True, tick_storage=None): """Initialize a real-time chart with support for multiple indicators and backtesting.""" # Store parameters self.symbol = symbol self.timeframe = timeframe self.debug_mode = debug_mode self.standalone = standalone self.chart_title = chart_title or f"{symbol} Real-Time Chart" self.extended_hours = extended_hours self.enable_logging = enable_logging self.run_signal_interpreter = run_signal_interpreter self.historical_candles = historical_candles self.performance_monitoring = performance_monitoring self.max_memory_usage = max_memory_usage self.memory_check_interval = memory_check_interval self.tick_update_interval = tick_update_interval self.chart_update_interval = chart_update_interval self.show_volume = show_volume self.show_indicators = show_indicators self.custom_trades = custom_trades self.port = port self.height = height self.width = width self.positions_callback = positions_callback self.allow_synthetic_data = allow_synthetic_data # Initialize interval store self.interval_store = {'interval': 1} # Default to 1s timeframe # Initialize trading components self.agent = agent self.trading_env = trading_env # Initialize button styles for timeframe selection self.button_style = { 'background': '#343a40', 'color': 'white', 'border': 'none', 'padding': '10px 20px', 'margin': '0 5px', 'borderRadius': '4px', 'cursor': 'pointer' } self.active_button_style = { 'background': '#007bff', 'color': 'white', 'border': 'none', 'padding': '10px 20px', 'margin': '0 5px', 'borderRadius': '4px', 'cursor': 'pointer', 'fontWeight': 'bold' } # Initialize color schemes self.colors = { 'background': '#1e1e1e', 'text': '#ffffff', 'grid': '#333333', 'candle_up': '#26a69a', 'candle_down': '#ef5350', 'volume_up': 'rgba(38, 166, 154, 0.3)', 'volume_down': 'rgba(239, 83, 80, 0.3)', 'ma': '#ffeb3b', 'ema': '#29b6f6', 'bollinger_bands': '#ff9800', 'trades_buy': '#00e676', 'trades_sell': '#ff1744' } # Initialize data storage self.all_trades = [] # Store trades self.positions = [] # Store open positions self.latest_price = 0.0 self.latest_volume = 0.0 self.latest_timestamp = datetime.now() self.current_balance = 100.0 # Starting balance self.accumulative_pnl = 0.0 # Accumulated profit/loss # Initialize trade rate counter self.trade_count = 0 self.start_time = time.time() self.trades_per_second = 0 self.trades_per_minute = 0 self.trades_per_hour = 0 # Initialize trade rate tracking variables self.trade_times = [] # Store timestamps of recent trades for rate calculation self.last_trade_rate_calculation = datetime.now() self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0} # Initialize interactive components self.app = app # Create a new app if not provided if self.app is None and standalone: self.app = dash.Dash( __name__, external_stylesheets=[dbc.themes.DARKLY], suppress_callback_exceptions=True ) # Initialize tick storage if not provided if tick_storage is None: # Check if TimescaleDB integration is enabled use_timescaledb = TIMESCALEDB_ENABLED and timescaledb_handler is not None # Create a new tick storage self.tick_storage = TickStorage( symbol=symbol, timeframes=["1s", "1m", "5m", "15m", "1h", "4h", "1d"], use_timescaledb=use_timescaledb ) # Load historical data immediately for cold start logger.info(f"Loading historical data for {symbol} during chart initialization") try: data_loaded = self.tick_storage.load_historical_data(symbol) if data_loaded: logger.info(f"Successfully loaded historical data for {symbol}") # Log what we have for tf in ["1s", "1m", "5m", "15m", "1h"]: candle_count = len(self.tick_storage.candles.get(tf, [])) logger.info(f" {tf}: {candle_count} candles") else: logger.warning(f"Failed to load historical data for {symbol}") except Exception as e: logger.error(f"Error loading historical data during initialization: {str(e)}") import traceback logger.error(traceback.format_exc()) else: self.tick_storage = tick_storage # Create layout and callbacks if app is provided if self.app is not None: # Create the layout self.app.layout = self._create_layout() # Register callbacks self._setup_callbacks() # Log initialization if self.enable_logging: logger.info(f"RealTimeChart initialized: {self.symbol} ({self.timeframe}) ") def _create_layout(self): return html.Div([ # Header section with title and current price html.Div([ html.H1(f"{self.symbol} Real-Time Chart", className="display-4"), # Current price ticker html.Div([ html.H4("Current Price:", style={"display": "inline-block", "marginRight": "10px"}), html.H3(id="current-price", style={"display": "inline-block", "color": "#17a2b8"}), html.Div([ html.H5("Balance:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.H5(id="current-balance", style={"display": "inline-block", "color": "#28a745"}), ], style={"display": "inline-block", "marginLeft": "40px"}), html.Div([ html.H5("Accumulated PnL:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.H5(id="accumulated-pnl", style={"display": "inline-block", "color": "#ffc107"}), ], style={"display": "inline-block", "marginLeft": "40px"}), # Add trade rate display html.Div([ html.H5("Trade Rate:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), html.Span([ html.Span(id="trade-rate-second", style={"color": "#ff7f0e"}), html.Span("/s, "), html.Span(id="trade-rate-minute", style={"color": "#ff7f0e"}), html.Span("/m, "), html.Span(id="trade-rate-hour", style={"color": "#ff7f0e"}), html.Span("/h") ], style={"display": "inline-block"}), ], style={"display": "inline-block", "marginLeft": "40px"}), ], style={"textAlign": "center", "margin": "20px 0"}), ], style={"textAlign": "center", "marginBottom": "20px"}), # Add interval component for periodic updates dcc.Interval( id='interval-component', interval=500, # in milliseconds n_intervals=0 ), # Add timeframe selection buttons html.Div([ html.Button('1s', id='btn-1s', n_clicks=0, style=self.active_button_style), html.Button('5s', id='btn-5s', n_clicks=0, style=self.button_style), html.Button('15s', id='btn-15s', n_clicks=0, style=self.button_style), html.Button('1m', id='btn-1m', n_clicks=0, style=self.button_style), html.Button('5m', id='btn-5m', n_clicks=0, style=self.button_style), html.Button('15m', id='btn-15m', n_clicks=0, style=self.button_style), html.Button('1h', id='btn-1h', n_clicks=0, style=self.button_style), ], style={"textAlign": "center", "marginBottom": "20px"}), # Store for the selected timeframe dcc.Store(id='interval-store', data={'interval': 1}), # Chart content (without wrapper div to avoid callback issues) dcc.Graph(id='live-chart', style={"height": "600px"}), dcc.Graph(id='secondary-charts', style={"height": "500px"}), html.Div(id='positions-list') ]) def _create_chart_and_controls(self): """Create the chart and controls for the dashboard.""" try: # Get selected interval from the dashboard (default to 1s if not available) interval_seconds = 1 if hasattr(self, 'interval_store') and self.interval_store: interval_seconds = self.interval_store.get('interval', 1) # Create chart components chart_div = html.Div([ # Update chart with data for the selected interval dcc.Graph( id='live-chart', figure=self._update_main_chart(interval_seconds), style={"height": "600px"} ), # Update secondary charts dcc.Graph( id='secondary-charts', figure=self._update_secondary_charts(), style={"height": "500px"} ), # Update positions list html.Div( id='positions-list', children=self._get_position_list_rows() ) ]) return chart_div except Exception as e: logger.error(f"Error creating chart and controls: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return a simple error message as fallback return html.Div(f"Error loading chart: {str(e)}", style={"color": "red", "padding": "20px"}) def _setup_callbacks(self): """Setup Dash callbacks for the real-time chart""" if self.app is None: return try: # Update chart with all components based on interval @self.app.callback( [ Output('live-chart', 'figure'), Output('secondary-charts', 'figure'), Output('positions-list', 'children'), Output('current-price', 'children'), Output('current-balance', 'children'), Output('accumulated-pnl', 'children'), Output('trade-rate-second', 'children'), Output('trade-rate-minute', 'children'), Output('trade-rate-hour', 'children') ], [ Input('interval-component', 'n_intervals'), Input('interval-store', 'data') ] ) def update_all(n_intervals, interval_data): """Update all chart components""" try: # Get selected interval interval_seconds = interval_data.get('interval', 1) if interval_data else 1 # Update main chart - limit data for performance main_chart = self._update_main_chart(interval_seconds) # Update secondary charts - limit data for performance secondary_charts = self._update_secondary_charts() # Update positions list positions_list = self._get_position_list_rows() # Update current price and balance current_price = f"${self.latest_price:.2f}" if self.latest_price else "Error" current_balance = f"${self.current_balance:.2f}" accumulated_pnl = f"${self.accumulative_pnl:.2f}" # Calculate trade rates trade_rate = self._calculate_trade_rate() trade_rate_second = f"{trade_rate['per_second']:.1f}" trade_rate_minute = f"{trade_rate['per_minute']:.1f}" trade_rate_hour = f"{trade_rate['per_hour']:.1f}" return (main_chart, secondary_charts, positions_list, current_price, current_balance, accumulated_pnl, trade_rate_second, trade_rate_minute, trade_rate_hour) except Exception as e: logger.error(f"Error in update_all callback: {str(e)}") # Return empty/error states import plotly.graph_objects as go empty_fig = go.Figure() empty_fig.add_annotation(text="Chart Loading...", xref="paper", yref="paper", x=0.5, y=0.5) return (empty_fig, empty_fig, [], "Loading...", "$0.00", "$0.00", "0.0", "0.0", "0.0") # Timeframe selection callbacks @self.app.callback( [Output('interval-store', 'data'), Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'), Output('btn-1m', 'style'), Output('btn-5m', 'style'), Output('btn-15m', 'style'), Output('btn-1h', 'style')], [Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'), Input('btn-1m', 'n_clicks'), Input('btn-5m', 'n_clicks'), Input('btn-15m', 'n_clicks'), Input('btn-1h', 'n_clicks')] ) def update_timeframe(n1s, n5s, n15s, n1m, n5m, n15m, n1h): """Update selected timeframe based on button clicks""" ctx = dash.callback_context if not ctx.triggered: # Default to 1s styles = [self.active_button_style] + [self.button_style] * 6 return {'interval': 1}, *styles button_id = ctx.triggered[0]['prop_id'].split('.')[0] # Map button to interval seconds interval_map = { 'btn-1s': 1, 'btn-5s': 5, 'btn-15s': 15, 'btn-1m': 60, 'btn-5m': 300, 'btn-15m': 900, 'btn-1h': 3600 } selected_interval = interval_map.get(button_id, 1) # Create styles - active for selected, normal for others button_names = ['btn-1s', 'btn-5s', 'btn-15s', 'btn-1m', 'btn-5m', 'btn-15m', 'btn-1h'] styles = [] for name in button_names: if name == button_id: styles.append(self.active_button_style) else: styles.append(self.button_style) return {'interval': selected_interval}, *styles logger.info("Dash callbacks registered successfully") except Exception as e: logger.error(f"Error setting up callbacks: {str(e)}") import traceback logger.error(traceback.format_exc()) def _calculate_trade_rate(self): """Calculate trading rate per second, minute, and hour""" try: now = datetime.now() current_time = time.time() # Filter trades within different time windows trades_last_second = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 1) trades_last_minute = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 60) trades_last_hour = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 3600) return { "per_second": trades_last_second, "per_minute": trades_last_minute, "per_hour": trades_last_hour } except Exception as e: logger.warning(f"Error calculating trade rate: {str(e)}") return {"per_second": 0.0, "per_minute": 0.0, "per_hour": 0.0} def _update_secondary_charts(self): """Create secondary charts for volume and indicators""" try: # Create subplots for secondary charts fig = make_subplots( rows=2, cols=1, subplot_titles=['Volume', 'Technical Indicators'], shared_xaxes=True, vertical_spacing=0.1, row_heights=[0.3, 0.7] ) # Get latest candles (limit for performance) candles = self.tick_storage.candles.get("1m", [])[-100:] # Last 100 candles for performance if not candles: fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5) fig.update_layout( title="Secondary Charts", template="plotly_dark", height=400 ) return fig # Extract data timestamps = [candle['timestamp'] for candle in candles] volumes = [candle['volume'] for candle in candles] closes = [candle['close'] for candle in candles] # Volume chart colors = ['#26a69a' if i == 0 or closes[i] >= closes[i-1] else '#ef5350' for i in range(len(closes))] fig.add_trace( go.Bar( x=timestamps, y=volumes, name='Volume', marker_color=colors, showlegend=False ), row=1, col=1 ) # Technical indicators if len(closes) >= 20: # Simple moving average sma_20 = pd.Series(closes).rolling(window=20).mean() fig.add_trace( go.Scatter( x=timestamps, y=sma_20, name='SMA 20', line=dict(color='#ffeb3b', width=2) ), row=2, col=1 ) # RSI calculation if len(closes) >= 14: rsi = self._calculate_rsi(closes, 14) fig.add_trace( go.Scatter( x=timestamps, y=rsi, name='RSI', line=dict(color='#29b6f6', width=2), yaxis='y3' ), row=2, col=1 ) # Update layout fig.update_layout( title="Volume & Technical Indicators", template="plotly_dark", height=400, showlegend=True, legend=dict(x=0, y=1, bgcolor='rgba(0,0,0,0)') ) # Update y-axes fig.update_yaxes(title="Volume", row=1, col=1) fig.update_yaxes(title="Price", row=2, col=1) return fig except Exception as e: logger.error(f"Error creating secondary charts: {str(e)}") # Return empty figure on error fig = go.Figure() fig.add_annotation(text=f"Error: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5) fig.update_layout(template="plotly_dark", height=400) return fig def _calculate_rsi(self, prices, period=14): """Calculate RSI indicator""" try: prices = pd.Series(prices) delta = prices.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi.fillna(50).tolist() # Fill NaN with neutral RSI value except Exception: return [50] * len(prices) # Return neutral RSI on error def _get_position_list_rows(self): """Get list of current positions for display""" try: if not self.positions: return [html.Div("No open positions", style={"color": "#888", "padding": "10px"})] rows = [] for i, position in enumerate(self.positions): try: # Calculate current PnL current_pnl = (self.latest_price - position.entry_price) * position.amount if position.action.upper() == 'SELL': current_pnl = -current_pnl # Create position row row = html.Div([ html.Span(f"#{i+1}: ", style={"fontWeight": "bold"}), html.Span(f"{position.action.upper()} ", style={"color": "#00e676" if position.action.upper() == "BUY" else "#ff1744"}), html.Span(f"{position.amount:.4f} @ ${position.entry_price:.2f} "), html.Span(f"PnL: ${current_pnl:.2f}", style={"color": "#00e676" if current_pnl >= 0 else "#ff1744"}) ], style={"padding": "5px", "borderBottom": "1px solid #333"}) rows.append(row) except Exception as e: logger.warning(f"Error formatting position {i}: {str(e)}") return rows except Exception as e: logger.error(f"Error getting position list: {str(e)}") return [html.Div("Error loading positions", style={"color": "red", "padding": "10px"})] def add_trade(self, action, price, amount, timestamp=None, trade_id=None): """Add a trade to the chart and update tracking""" try: if timestamp is None: timestamp = datetime.now() # Create trade record trade = { 'id': trade_id or str(uuid.uuid4()), 'action': action.upper(), 'price': float(price), 'amount': float(amount), 'timestamp': timestamp, 'value': float(price) * float(amount) } # Add to trades list self.all_trades.append(trade) # Update trade rate tracking self.trade_times.append(time.time()) # Keep only last hour of trade times cutoff_time = time.time() - 3600 self.trade_times = [t for t in self.trade_times if t > cutoff_time] # Update positions if action.upper() in ['BUY', 'SELL']: position = Position( action=action.upper(), entry_price=float(price), amount=float(amount), timestamp=timestamp, trade_id=trade['id'] ) self.positions.append(position) # Update balance and PnL if action.upper() == 'BUY': self.current_balance -= trade['value'] else: # SELL self.current_balance += trade['value'] # Calculate PnL for this trade if len(self.all_trades) > 1: # Simple PnL calculation - more sophisticated logic could be added last_opposite_trades = [t for t in reversed(self.all_trades[:-1]) if t['action'] != action.upper()] if last_opposite_trades: last_trade = last_opposite_trades[0] if action.upper() == 'SELL': pnl = (float(price) - last_trade['price']) * float(amount) else: # BUY pnl = (last_trade['price'] - float(price)) * float(amount) self.accumulative_pnl += pnl logger.info(f"Added trade: {action.upper()} {amount} @ ${price:.2f}") except Exception as e: logger.error(f"Error adding trade: {str(e)}") def _get_interval_key(self, interval_seconds): """Convert interval seconds to timeframe key""" if interval_seconds <= 1: return "1s" elif interval_seconds <= 5: return "5s" if "5s" in self.tick_storage.timeframes else "1s" elif interval_seconds <= 15: return "15s" if "15s" in self.tick_storage.timeframes else "1m" elif interval_seconds <= 60: return "1m" elif interval_seconds <= 300: return "5m" elif interval_seconds <= 900: return "15m" elif interval_seconds <= 3600: return "1h" elif interval_seconds <= 14400: return "4h" else: return "1d" def _update_main_chart(self, interval_seconds): """Update the main chart for the specified interval""" try: # Convert interval seconds to timeframe key interval_key = self._get_interval_key(interval_seconds) # Get candles for this timeframe (limit to last 100 for performance) candles = self.tick_storage.candles.get(interval_key, [])[-100:] if not candles: logger.warning(f"No candle data available for {interval_key}") # Return empty figure with a message fig = go.Figure() fig.add_annotation( text=f"No data available for {interval_key}", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="white") ) fig.update_layout( title=f"{self.symbol} - {interval_key} Chart", template="plotly_dark", height=600 ) return fig # Extract data from candles timestamps = [candle['timestamp'] for candle in candles] opens = [candle['open'] for candle in candles] highs = [candle['high'] for candle in candles] lows = [candle['low'] for candle in candles] closes = [candle['close'] for candle in candles] volumes = [candle['volume'] for candle in candles] # Create candlestick chart fig = go.Figure() # Add candlestick trace fig.add_trace(go.Candlestick( x=timestamps, open=opens, high=highs, low=lows, close=closes, name="Price", increasing_line_color='#26a69a', decreasing_line_color='#ef5350', increasing_fillcolor='#26a69a', decreasing_fillcolor='#ef5350' )) # Add trade markers if we have trades if self.all_trades: # Filter trades to match the current timeframe window start_time = timestamps[0] if timestamps else datetime.now() - timedelta(hours=1) end_time = timestamps[-1] if timestamps else datetime.now() filtered_trades = [ trade for trade in self.all_trades if start_time <= trade['timestamp'] <= end_time ] if filtered_trades: buy_trades = [t for t in filtered_trades if t['action'] == 'BUY'] sell_trades = [t for t in filtered_trades if t['action'] == 'SELL'] # Add BUY markers if buy_trades: fig.add_trace(go.Scatter( x=[t['timestamp'] for t in buy_trades], y=[t['price'] for t in buy_trades], mode='markers', marker=dict( symbol='triangle-up', size=12, color='#00e676', line=dict(color='white', width=1) ), name='BUY', text=[f"BUY {t['amount']:.4f} @ ${t['price']:.2f}" for t in buy_trades], hovertemplate='%{text}
Time: %{x}' )) # Add SELL markers if sell_trades: fig.add_trace(go.Scatter( x=[t['timestamp'] for t in sell_trades], y=[t['price'] for t in sell_trades], mode='markers', marker=dict( symbol='triangle-down', size=12, color='#ff1744', line=dict(color='white', width=1) ), name='SELL', text=[f"SELL {t['amount']:.4f} @ ${t['price']:.2f}" for t in sell_trades], hovertemplate='%{text}
Time: %{x}' )) # Add moving averages if we have enough data if len(closes) >= 20: # 20-period SMA sma_20 = pd.Series(closes).rolling(window=20).mean() fig.add_trace(go.Scatter( x=timestamps, y=sma_20, name='SMA 20', line=dict(color='#ffeb3b', width=1), opacity=0.7 )) if len(closes) >= 50: # 50-period SMA sma_50 = pd.Series(closes).rolling(window=50).mean() fig.add_trace(go.Scatter( x=timestamps, y=sma_50, name='SMA 50', line=dict(color='#ff9800', width=1), opacity=0.7 )) # Update layout fig.update_layout( title=f"{self.symbol} - {interval_key} Chart ({len(candles)} candles)", template="plotly_dark", height=600, xaxis_title="Time", yaxis_title="Price ($)", legend=dict( yanchor="top", y=0.99, xanchor="left", x=0.01, bgcolor="rgba(0,0,0,0.5)" ), hovermode='x unified', dragmode='pan' ) # Remove range slider for better performance fig.update_layout(xaxis_rangeslider_visible=False) # Update the latest price if closes: self.latest_price = closes[-1] self.latest_timestamp = timestamps[-1] return fig except Exception as e: logger.error(f"Error updating main chart: {str(e)}") import traceback logger.error(traceback.format_exc()) # Return error figure fig = go.Figure() fig.add_annotation( text=f"Chart Error: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="red") ) fig.update_layout( title="Chart Error", template="plotly_dark", height=600 ) return fig def set_trading_env(self, trading_env): """Set the trading environment to monitor for new trades""" self.trading_env = trading_env if hasattr(trading_env, 'add_trade_callback'): trading_env.add_trade_callback(self.add_trade) logger.info("Trading environment integrated with chart") def set_agent(self, agent): """Set the agent to monitor for trading decisions""" self.agent = agent logger.info("Agent integrated with chart") def update_from_env(self, env_data): """Update chart data from trading environment""" try: if 'latest_price' in env_data: self.latest_price = env_data['latest_price'] if 'balance' in env_data: self.current_balance = env_data['balance'] if 'pnl' in env_data: self.accumulative_pnl = env_data['pnl'] if 'trades' in env_data: # Add any new trades for trade in env_data['trades']: if trade not in self.all_trades: self.add_trade( action=trade.get('action', 'HOLD'), price=trade.get('price', self.latest_price), amount=trade.get('amount', 0.1), timestamp=trade.get('timestamp', datetime.now()), trade_id=trade.get('id') ) except Exception as e: logger.error(f"Error updating from environment: {str(e)}") def get_latest_data(self): """Get the latest data for external systems""" return { 'latest_price': self.latest_price, 'latest_volume': self.latest_volume, 'latest_timestamp': self.latest_timestamp, 'current_balance': self.current_balance, 'accumulative_pnl': self.accumulative_pnl, 'positions': len(self.positions), 'trade_count': len(self.all_trades), 'trade_rate': self._calculate_trade_rate() } async def start_websocket(self): """Start the websocket connection for real-time data""" try: logger.info("Starting websocket connection for real-time data") # Start the websocket data fetching websocket_url = "wss://stream.binance.com:9443/ws/ethusdt@ticker" async def websocket_handler(): """Handle websocket connection and data updates""" try: async with websockets.connect(websocket_url) as websocket: logger.info(f"WebSocket connected for {self.symbol}") message_count = 0 async for message in websocket: try: data = json.loads(message) # Update tick storage with new price data tick = { 'price': float(data['c']), # Current price 'volume': float(data['v']), # Volume 'timestamp': pd.Timestamp.now() } self.tick_storage.add_tick(tick) # Update chart's latest price and volume self.latest_price = float(data['c']) self.latest_volume = float(data['v']) self.latest_timestamp = pd.Timestamp.now() message_count += 1 # Log periodic updates if message_count % 100 == 0: logger.info(f"Received message #{message_count}") logger.info(f"Processed {message_count} ticks, current price: ${self.latest_price:.2f}") # Log candle counts candle_count = len(self.tick_storage.candles.get("1s", [])) logger.info(f"Current 1s candles count: {candle_count}") except json.JSONDecodeError as e: logger.warning(f"Failed to parse websocket message: {str(e)}") except Exception as e: logger.error(f"Error processing websocket message: {str(e)}") except websockets.exceptions.ConnectionClosed: logger.warning("WebSocket connection closed") except Exception as e: logger.error(f"WebSocket error: {str(e)}") # Start the websocket handler in the background await websocket_handler() except Exception as e: logger.error(f"Error starting websocket: {str(e)}") import traceback logger.error(traceback.format_exc()) def run(self, host='127.0.0.1', port=8050, debug=False): """Run the Dash app""" try: if self.app is None: logger.error("No Dash app instance available") return logger.info("="*60) logger.info("🔗 ACCESS WEB UI AT: http://localhost:8050/") logger.info("📊 View live trading data and charts in your browser") logger.info("="*60) # Run the app - FIXED: Updated for newer Dash versions self.app.run( host=host, port=port, debug=debug, use_reloader=False, # Disable reloader to avoid conflicts threaded=True # Enable threading for better performance ) except Exception as e: logger.error(f"Error running Dash app: {str(e)}") import traceback logger.error(traceback.format_exc())