# import asyncio # import json # import logging # # Fix PIL import issue that causes plotly JSON serialization errors # import os # os.environ['MPLBACKEND'] = 'Agg' # Use non-interactive backend # try: # # Try to fix PIL import issue # import PIL.Image # # Disable PIL in plotly to prevent circular import issues # import plotly.io as pio # pio.kaleido.scope.default_format = "png" # except ImportError: # pass # except Exception: # # Suppress any PIL-related errors during import # pass # from typing import Dict, List, Optional, Tuple, Union # import websockets # import plotly.graph_objects as go # from plotly.subplots import make_subplots # import dash # from dash import html, dcc # from dash.dependencies import Input, Output # import pandas as pd # import numpy as np # from collections import deque # import time # from threading import Thread # import requests # import os # from datetime import datetime, timedelta # import pytz # import tzlocal # import threading # import random # import dash_bootstrap_components as dbc # import uuid # import ta # from sklearn.preprocessing import MinMaxScaler # import re # import psutil # import gc # import websocket # # Import psycopg2 with error handling # try: # import psycopg2 # PSYCOPG2_AVAILABLE = True # except ImportError: # PSYCOPG2_AVAILABLE = False # psycopg2 = None # # TimescaleDB configuration from environment variables # TIMESCALEDB_ENABLED = os.environ.get('TIMESCALEDB_ENABLED', '1') == '1' and PSYCOPG2_AVAILABLE # TIMESCALEDB_HOST = os.environ.get('TIMESCALEDB_HOST', '192.168.0.10') # TIMESCALEDB_PORT = int(os.environ.get('TIMESCALEDB_PORT', '5432')) # TIMESCALEDB_USER = os.environ.get('TIMESCALEDB_USER', 'postgres') # TIMESCALEDB_PASSWORD = os.environ.get('TIMESCALEDB_PASSWORD', 'timescaledbpass') # TIMESCALEDB_DB = os.environ.get('TIMESCALEDB_DB', 'candles') # class TimescaleDBHandler: # """Handler for TimescaleDB operations for candle storage and retrieval""" # def __init__(self): # """Initialize TimescaleDB connection if enabled""" # self.enabled = TIMESCALEDB_ENABLED # self.conn = None # if not self.enabled: # if not PSYCOPG2_AVAILABLE: # print("psycopg2 module not available. TimescaleDB integration disabled.") # return # try: # # Connect to TimescaleDB # self.conn = psycopg2.connect( # host=TIMESCALEDB_HOST, # port=TIMESCALEDB_PORT, # user=TIMESCALEDB_USER, # password=TIMESCALEDB_PASSWORD, # dbname=TIMESCALEDB_DB # ) # print(f"Connected to TimescaleDB at {TIMESCALEDB_HOST}:{TIMESCALEDB_PORT}") # # Ensure the candles table exists # self._ensure_table() # print("TimescaleDB integration initialized successfully") # except Exception as e: # print(f"Error connecting to TimescaleDB: {str(e)}") # self.enabled = False # self.conn = None # def _ensure_table(self): # """Ensure the candles table exists with TimescaleDB hypertable""" # if not self.conn: # return # try: # with self.conn.cursor() as cur: # # Create the candles table if it doesn't exist # cur.execute(''' # CREATE TABLE IF NOT EXISTS candles ( # symbol TEXT, # interval TEXT, # timestamp TIMESTAMPTZ, # open DOUBLE PRECISION, # high DOUBLE PRECISION, # low DOUBLE PRECISION, # close DOUBLE PRECISION, # volume DOUBLE PRECISION, # PRIMARY KEY (symbol, interval, timestamp) # ); # ''') # # Check if the table is already a hypertable # cur.execute(''' # SELECT EXISTS ( # SELECT 1 FROM timescaledb_information.hypertables # WHERE hypertable_name = 'candles' # ); # ''') # is_hypertable = cur.fetchone()[0] # # Convert to hypertable if not already done # if not is_hypertable: # cur.execute(''' # SELECT create_hypertable('candles', 'timestamp', # if_not_exists => TRUE, # migrate_data => TRUE # ); # ''') # self.conn.commit() # print("TimescaleDB table structure verified") # except Exception as e: # print(f"Error setting up TimescaleDB tables: {str(e)}") # self.enabled = False # def upsert_candle(self, symbol, interval, candle): # """Insert or update a candle in TimescaleDB""" # if not self.enabled or not self.conn: # return False # try: # with self.conn.cursor() as cur: # cur.execute(''' # INSERT INTO candles ( # symbol, interval, timestamp, # open, high, low, close, volume # ) # VALUES (%s, %s, %s, %s, %s, %s, %s, %s) # ON CONFLICT (symbol, interval, timestamp) # DO UPDATE SET # open = EXCLUDED.open, # high = EXCLUDED.high, # low = EXCLUDED.low, # close = EXCLUDED.close, # volume = EXCLUDED.volume # ''', ( # symbol, interval, candle['timestamp'], # candle['open'], candle['high'], candle['low'], # candle['close'], candle['volume'] # )) # self.conn.commit() # return True # except Exception as e: # print(f"Error upserting candle to TimescaleDB: {str(e)}") # # Try to reconnect on error # try: # self.conn = psycopg2.connect( # host=TIMESCALEDB_HOST, # port=TIMESCALEDB_PORT, # user=TIMESCALEDB_USER, # password=TIMESCALEDB_PASSWORD, # dbname=TIMESCALEDB_DB # ) # except: # pass # return False # def fetch_candles(self, symbol, interval, limit=1000): # """Fetch candles from TimescaleDB""" # if not self.enabled or not self.conn: # return [] # try: # with self.conn.cursor() as cur: # cur.execute(''' # SELECT timestamp, open, high, low, close, volume # FROM candles # WHERE symbol = %s AND interval = %s # ORDER BY timestamp DESC # LIMIT %s # ''', (symbol, interval, limit)) # rows = cur.fetchall() # # Convert to list of dictionaries (ordered from oldest to newest) # candles = [] # for row in reversed(rows): # Reverse to get oldest first # candle = { # 'timestamp': row[0], # 'open': row[1], # 'high': row[2], # 'low': row[3], # 'close': row[4], # 'volume': row[5] # } # candles.append(candle) # return candles # except Exception as e: # print(f"Error fetching candles from TimescaleDB: {str(e)}") # # Try to reconnect on error # try: # self.conn = psycopg2.connect( # host=TIMESCALEDB_HOST, # port=TIMESCALEDB_PORT, # user=TIMESCALEDB_USER, # password=TIMESCALEDB_PASSWORD, # dbname=TIMESCALEDB_DB # ) # except: # pass # return [] # class BinanceHistoricalData: # """ # Class for fetching historical price data from Binance. # """ # def __init__(self): # self.base_url = "https://api.binance.com/api/v3" # self.cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache') # if not os.path.exists(self.cache_dir): # os.makedirs(self.cache_dir) # # Timestamp of last data update # self.last_update = None # def get_historical_candles(self, symbol, interval_seconds=3600, limit=1000): # """ # Fetch historical candles from Binance API. # Args: # symbol (str): Trading pair symbol (e.g., "BTC/USDT") # interval_seconds (int): Timeframe in seconds (e.g., 3600 for 1h) # limit (int): Number of candles to fetch # Returns: # pd.DataFrame: DataFrame with OHLCV data # """ # # Convert interval_seconds to Binance interval format # interval_map = { # 1: "1s", # 60: "1m", # 300: "5m", # 900: "15m", # 1800: "30m", # 3600: "1h", # 14400: "4h", # 86400: "1d" # } # interval = interval_map.get(interval_seconds, "1h") # # Format symbol for Binance API (remove slash and make uppercase) # formatted_symbol = symbol.replace("/", "").upper() # # Check if we have cached data first # cache_file = self._get_cache_filename(formatted_symbol, interval) # cached_data = self._load_from_cache(formatted_symbol, interval) # # If we have cached data that's recent enough, use it # if cached_data is not None and len(cached_data) >= limit: # cache_age_minutes = (datetime.now() - self.last_update).total_seconds() / 60 if self.last_update else 60 # if cache_age_minutes < 15: # Only use cache if it's less than 15 minutes old # logger.info(f"Using cached historical data for {symbol} ({interval})") # return cached_data # try: # # Build URL for klines endpoint # url = f"{self.base_url}/klines" # params = { # "symbol": formatted_symbol, # "interval": interval, # "limit": limit # } # # Make the request # response = requests.get(url, params=params) # response.raise_for_status() # # Parse the response # data = response.json() # # Create dataframe # df = pd.DataFrame(data, columns=[ # "timestamp", "open", "high", "low", "close", "volume", # "close_time", "quote_asset_volume", "number_of_trades", # "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" # ]) # # Convert timestamp to datetime # df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms") # # Convert price columns to float # for col in ["open", "high", "low", "close", "volume"]: # df[col] = df[col].astype(float) # # Sort by timestamp # df = df.sort_values("timestamp") # # Save to cache for future use # self._save_to_cache(df, formatted_symbol, interval) # self.last_update = datetime.now() # logger.info(f"Fetched {len(df)} candles for {symbol} ({interval})") # return df # except Exception as e: # logger.error(f"Error fetching historical data from Binance: {str(e)}") # # Return cached data if we have it, even if it's not enough # if cached_data is not None: # logger.warning(f"Using cached data instead (may be incomplete)") # return cached_data # # Return empty dataframe on error # return pd.DataFrame() # def _get_cache_filename(self, symbol, interval): # """Get filename for cache file""" # return os.path.join(self.cache_dir, f"{symbol}_{interval}_candles.csv") # def _load_from_cache(self, symbol, interval): # """Load candles from cache file""" # try: # cache_file = self._get_cache_filename(symbol, interval) # if os.path.exists(cache_file): # # For 1s interval, check if the cache is recent (less than 10 minutes old) # if interval == "1s" or interval == 1: # file_mod_time = datetime.fromtimestamp(os.path.getmtime(cache_file)) # time_diff = (datetime.now() - file_mod_time).total_seconds() / 60 # if time_diff > 10: # logger.info("1s cache is older than 10 minutes, skipping load") # return None # logger.info(f"Using recent 1s cache (age: {time_diff:.1f} minutes)") # df = pd.read_csv(cache_file) # df["timestamp"] = pd.to_datetime(df["timestamp"]) # logger.info(f"Loaded {len(df)} candles from cache: {cache_file}") # return df # except Exception as e: # logger.error(f"Error loading cached data: {str(e)}") # return None # def _save_to_cache(self, df, symbol, interval): # """Save candles to cache file""" # try: # cache_file = self._get_cache_filename(symbol, interval) # df.to_csv(cache_file, index=False) # logger.info(f"Saved {len(df)} candles to cache: {cache_file}") # return True # except Exception as e: # logger.error(f"Error saving to cache: {str(e)}") # return False # def get_recent_trades(self, symbol, limit=1000): # """Get recent trades for a symbol""" # formatted_symbol = symbol.replace("/", "") # try: # url = f"{self.base_url}/trades" # params = { # "symbol": formatted_symbol, # "limit": limit # } # response = requests.get(url, params=params) # response.raise_for_status() # data = response.json() # # Create dataframe # df = pd.DataFrame(data) # df["time"] = pd.to_datetime(df["time"], unit="ms") # df["price"] = df["price"].astype(float) # df["qty"] = df["qty"].astype(float) # return df # except Exception as e: # logger.error(f"Error fetching recent trades: {str(e)}") # return pd.DataFrame() # class MultiTimeframeDataInterface: # """ # Enhanced Data Interface supporting: # - Multiple trading pairs # - Multiple timeframes per pair (1s, 1m, 1h, 1d + custom) # - Technical indicators # - Cross-timeframe normalization # - Real-time data updates # """ # def __init__(self, symbol=None, timeframes=None, data_dir="data"): # """ # Initialize the data interface. # Args: # symbol (str): Trading pair symbol (e.g., "BTC/USDT") # timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d']) # data_dir (str): Directory to store/load datasets # """ # self.symbol = symbol # self.timeframes = timeframes or ['1h', '4h', '1d'] # self.data_dir = data_dir # self.scalers = {} # Store scalers for each timeframe # # Initialize the historical data fetcher # self.historical_data = BinanceHistoricalData() # # Create data directory if it doesn't exist # os.makedirs(self.data_dir, exist_ok=True) # # Initialize empty dataframes for each timeframe # self.dataframes = {tf: None for tf in self.timeframes} # # Store timestamps of last updates per timeframe # self.last_updates = {tf: None for tf in self.timeframes} # # Timeframe mapping (string to seconds) # self.timeframe_to_seconds = { # '1s': 1, # '1m': 60, # '5m': 300, # '15m': 900, # '30m': 1800, # '1h': 3600, # '4h': 14400, # '1d': 86400 # } # logger.info(f"MultiTimeframeDataInterface initialized for {symbol} with timeframes {timeframes}") # def get_data(self, timeframe='1h', n_candles=1000, refresh=False, add_indicators=True): # """ # Fetch historical price data for a given timeframe with optional indicators. # Args: # timeframe (str): Timeframe to fetch data for # n_candles (int): Number of candles to fetch # refresh (bool): Force refresh of the data # add_indicators (bool): Whether to add technical indicators # Returns: # pd.DataFrame: DataFrame with OHLCV data and indicators # """ # # Check if we need to refresh # current_time = datetime.now() # if (not refresh and # self.dataframes[timeframe] is not None and # self.last_updates[timeframe] is not None and # (current_time - self.last_updates[timeframe]).total_seconds() < 60): # #logger.info(f"Using cached data for {self.symbol} {timeframe}") # return self.dataframes[timeframe] # interval_seconds = self.timeframe_to_seconds.get(timeframe, 3600) # # Fetch data # df = self.historical_data.get_historical_candles( # symbol=self.symbol, # interval_seconds=interval_seconds, # limit=n_candles # ) # if df is None or df.empty: # logger.error(f"No data available for {self.symbol} {timeframe}") # return None # # Add indicators if requested # if add_indicators: # df = self.add_indicators(df) # # Store in cache # self.dataframes[timeframe] = df # self.last_updates[timeframe] = current_time # logger.info(f"Fetched and processed {len(df)} candles for {self.symbol} {timeframe}") # return df # def add_indicators(self, df): # """ # Add comprehensive technical indicators to the dataframe. # Args: # df (pd.DataFrame): DataFrame with OHLCV data # Returns: # pd.DataFrame: DataFrame with added technical indicators # """ # # Make a copy to avoid modifying the original # df_copy = df.copy() # # Basic price indicators # df_copy['returns'] = df_copy['close'].pct_change() # df_copy['log_returns'] = np.log(df_copy['close'] / df_copy['close'].shift(1)) # # Moving Averages # df_copy['sma_7'] = ta.trend.sma_indicator(df_copy['close'], window=7) # df_copy['sma_25'] = ta.trend.sma_indicator(df_copy['close'], window=25) # df_copy['sma_99'] = ta.trend.sma_indicator(df_copy['close'], window=99) # df_copy['ema_9'] = ta.trend.ema_indicator(df_copy['close'], window=9) # df_copy['ema_21'] = ta.trend.ema_indicator(df_copy['close'], window=21) # # MACD # macd = ta.trend.MACD(df_copy['close']) # df_copy['macd'] = macd.macd() # df_copy['macd_signal'] = macd.macd_signal() # df_copy['macd_diff'] = macd.macd_diff() # # RSI # df_copy['rsi'] = ta.momentum.rsi(df_copy['close'], window=14) # # Bollinger Bands # bollinger = ta.volatility.BollingerBands(df_copy['close']) # df_copy['bb_high'] = bollinger.bollinger_hband() # df_copy['bb_low'] = bollinger.bollinger_lband() # df_copy['bb_pct'] = bollinger.bollinger_pband() # # Stochastic Oscillator # stoch = ta.momentum.StochasticOscillator(df_copy['high'], df_copy['low'], df_copy['close']) # df_copy['stoch_k'] = stoch.stoch() # df_copy['stoch_d'] = stoch.stoch_signal() # # ATR - Average True Range # df_copy['atr'] = ta.volatility.average_true_range(df_copy['high'], df_copy['low'], df_copy['close'], window=14) # # Money Flow Index # df_copy['mfi'] = ta.volume.money_flow_index(df_copy['high'], df_copy['low'], df_copy['close'], df_copy['volume'], window=14) # # OBV - On-Balance Volume # df_copy['obv'] = ta.volume.on_balance_volume(df_copy['close'], df_copy['volume']) # # Ichimoku Cloud # ichimoku = ta.trend.IchimokuIndicator(df_copy['high'], df_copy['low']) # df_copy['ichimoku_a'] = ichimoku.ichimoku_a() # df_copy['ichimoku_b'] = ichimoku.ichimoku_b() # df_copy['ichimoku_base'] = ichimoku.ichimoku_base_line() # df_copy['ichimoku_conv'] = ichimoku.ichimoku_conversion_line() # # ADX - Average Directional Index # adx = ta.trend.ADXIndicator(df_copy['high'], df_copy['low'], df_copy['close']) # df_copy['adx'] = adx.adx() # df_copy['adx_pos'] = adx.adx_pos() # df_copy['adx_neg'] = adx.adx_neg() # # VWAP - Volume Weighted Average Price (intraday) # # Custom calculation since TA library doesn't include VWAP # df_copy['vwap'] = (df_copy['volume'] * (df_copy['high'] + df_copy['low'] + df_copy['close']) / 3).cumsum() / df_copy['volume'].cumsum() # # Fill NaN values # df_copy = df_copy.fillna(method='bfill').fillna(0) # return df_copy # def get_multi_timeframe_data(self, timeframes=None, n_candles=1000, refresh=False, add_indicators=True): # """ # Fetch data for multiple timeframes. # Args: # timeframes (list): List of timeframes to fetch # n_candles (int): Number of candles to fetch for each timeframe # refresh (bool): Force refresh of the data # add_indicators (bool): Whether to add technical indicators # Returns: # dict: Dictionary of dataframes indexed by timeframe # """ # if timeframes is None: # timeframes = self.timeframes # result = {} # for tf in timeframes: # # For higher timeframes, we need fewer candles # tf_candles = n_candles # if tf == '4h': # tf_candles = max(250, n_candles // 4) # elif tf == '1d': # tf_candles = max(100, n_candles // 24) # df = self.get_data(timeframe=tf, n_candles=tf_candles, refresh=refresh, add_indicators=add_indicators) # if df is not None and not df.empty: # result[tf] = df # return result # def prepare_training_data(self, window_size=20, train_ratio=0.8, refresh=False): # """ # Prepare training data from multiple timeframes. # Args: # window_size (int): Size of the sliding window # train_ratio (float): Ratio of data to use for training # refresh (bool): Whether to refresh the data # Returns: # tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices) # """ # # Get data for all timeframes # data_dict = self.get_multi_timeframe_data(refresh=refresh) # if not data_dict: # logger.error("Failed to fetch data for any timeframe") # return None, None, None, None, None, None # # Align all dataframes by timestamp # all_dfs = list(data_dict.values()) # min_date = max([df['timestamp'].min() for df in all_dfs]) # max_date = min([df['timestamp'].max() for df in all_dfs]) # aligned_dfs = {} # for tf, df in data_dict.items(): # aligned_df = df[(df['timestamp'] >= min_date) & (df['timestamp'] <= max_date)] # aligned_dfs[tf] = aligned_df # # Choose the lowest timeframe as the reference for time alignment # reference_tf = min(self.timeframes, key=lambda x: self.timeframe_to_seconds.get(x, 3600)) # reference_df = aligned_dfs[reference_tf] # # Create sliding windows for each timeframe # X_dict = {} # for tf, df in aligned_dfs.items(): # # Drop timestamp and create numeric features # features = df.drop('timestamp', axis=1).values # # Ensure the feature array is 3D: [samples, window, features] # X = np.array([features[i:i+window_size] for i in range(len(features)-window_size)]) # X_dict[tf] = X # # Create target labels based on future price movements # reference_prices = reference_df['close'].values # future_prices = reference_prices[window_size:] # current_prices = reference_prices[window_size-1:-1] # # Calculate returns # returns = (future_prices - current_prices) / current_prices # # Create labels: 0=SELL, 1=HOLD, 2=BUY # threshold = 0.0005 # 0.05% threshold # y = np.zeros(len(returns), dtype=int) # y[returns > threshold] = 2 # BUY # y[returns < -threshold] = 0 # SELL # y[(returns >= -threshold) & (returns <= threshold)] = 1 # HOLD # # Split into training and validation sets # split_idx = int(len(y) * train_ratio) # X_train_dict = {tf: X[:split_idx] for tf, X in X_dict.items()} # X_val_dict = {tf: X[split_idx:] for tf, X in X_dict.items()} # y_train = y[:split_idx] # y_val = y[split_idx:] # train_prices = reference_prices[window_size-1:window_size-1+split_idx] # val_prices = reference_prices[window_size-1+split_idx:window_size-1+len(y)] # logger.info(f"Prepared training data - Train: {len(y_train)}, Val: {len(y_val)}") # return X_train_dict, y_train, X_val_dict, y_val, train_prices, val_prices # def normalize_data(self, data_dict, fit=True): # """ # Normalize data across all timeframes. # Args: # data_dict (dict): Dictionary of data arrays by timeframe # fit (bool): Whether to fit new scalers or use existing ones # Returns: # dict: Dictionary of normalized data arrays # """ # result = {} # for tf, data in data_dict.items(): # # For 3D data [samples, window, features] # if len(data.shape) == 3: # samples, window, features = data.shape # reshaped = data.reshape(-1, features) # if fit or tf not in self.scalers: # self.scalers[tf] = MinMaxScaler() # normalized = self.scalers[tf].fit_transform(reshaped) # else: # normalized = self.scalers[tf].transform(reshaped) # result[tf] = normalized.reshape(samples, window, features) # # For 2D data [samples, features] # elif len(data.shape) == 2: # if fit or tf not in self.scalers: # self.scalers[tf] = MinMaxScaler() # result[tf] = self.scalers[tf].fit_transform(data) # else: # result[tf] = self.scalers[tf].transform(data) # return result # def get_realtime_features(self, timeframes=None, window_size=20): # """ # Get the most recent data for real-time prediction. # Args: # timeframes (list): List of timeframes to use # window_size (int): Size of the sliding window # Returns: # dict: Dictionary of feature arrays for the latest window # """ # if timeframes is None: # timeframes = self.timeframes # # Get fresh data # data_dict = self.get_multi_timeframe_data(timeframes=timeframes, refresh=True) # result = {} # for tf, df in data_dict.items(): # if len(df) < window_size: # logger.warning(f"Not enough data for {tf} (need {window_size}, got {len(df)})") # continue # # Get the latest window # latest_data = df.tail(window_size).drop('timestamp', axis=1).values # # Add extra dimension to match model input shape [1, window_size, features] # result[tf] = latest_data.reshape(1, window_size, -1) # # Apply normalization using existing scalers # if self.scalers: # result = self.normalize_data(result, fit=False) # return result # def calculate_pnl(self, predictions, prices, position_size=1.0, fee_rate=0.0002): # """ # Calculate PnL and win rate from predictions. # Args: # predictions (np.ndarray): Array of predicted actions (0=SELL, 1=HOLD, 2=BUY) # prices (np.ndarray): Array of prices # position_size (float): Size of each position # fee_rate (float): Trading fee rate (default: 0.0002 for 0.02% per trade) # Returns: # tuple: (total_pnl, win_rate, trades) # """ # if len(predictions) < 2 or len(prices) < 2: # return 0.0, 0.0, [] # # Ensure arrays are the same length # min_len = min(len(predictions), len(prices)-1) # actions = predictions[:min_len] # pnl = 0.0 # wins = 0 # trades = [] # for i in range(min_len): # current_price = prices[i] # next_price = prices[i+1] # action = actions[i] # # Skip HOLD actions # if action == 1: # continue # price_change = (next_price - current_price) / current_price # if action == 2: # BUY # # Calculate raw PnL # raw_pnl = price_change * position_size # # Calculate fees (entry and exit) # entry_fee = position_size * fee_rate # exit_fee = position_size * (1 + price_change) * fee_rate # total_fees = entry_fee + exit_fee # # Net PnL after fees # trade_pnl = raw_pnl - total_fees # trade_type = 'BUY' # is_win = trade_pnl > 0 # elif action == 0: # SELL # # Calculate raw PnL # raw_pnl = -price_change * position_size # # Calculate fees (entry and exit) # entry_fee = position_size * fee_rate # exit_fee = position_size * (1 - price_change) * fee_rate # total_fees = entry_fee + exit_fee # # Net PnL after fees # trade_pnl = raw_pnl - total_fees # trade_type = 'SELL' # is_win = trade_pnl > 0 # else: # continue # pnl += trade_pnl # wins += int(is_win) # trades.append({ # 'type': trade_type, # 'entry': float(current_price), # Ensure serializable # 'exit': float(next_price), # 'raw_pnl': float(raw_pnl), # 'fees': float(total_fees), # 'pnl': float(trade_pnl), # 'win': bool(is_win), # 'timestamp': datetime.now().isoformat() # Add timestamp # }) # win_rate = wins / len(trades) if trades else 0.0 # return float(pnl), float(win_rate), trades # # Configure logging with more detailed format # logging.basicConfig( # level=logging.INFO, # Changed to DEBUG for more detailed logs # format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', # handlers=[ # logging.StreamHandler(), # logging.FileHandler('realtime_chart.log') # ] # ) # logger = logging.getLogger(__name__) # # Neural Network integration (conditional import) # NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1' # nn_orchestrator = None # nn_inference_thread = None # if NN_ENABLED: # try: # import sys # # Add project root to sys.path if needed # project_root = os.path.dirname(os.path.abspath(__file__)) # if project_root not in sys.path: # sys.path.append(project_root) # from NN.main import NeuralNetworkOrchestrator # logger.info("Neural Network module enabled") # except ImportError as e: # logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}") # NN_ENABLED = False # # NN utility functions # def setup_neural_network(): # """Initialize the neural network components if enabled""" # global nn_orchestrator, NN_ENABLED # if not NN_ENABLED: # return False # try: # # Get configuration from environment variables or use defaults # symbol = os.environ.get('NN_SYMBOL', 'ETH/USDT') # timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',') # output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL # # Configure the orchestrator # config = { # 'symbol': symbol, # 'timeframes': timeframes, # 'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')), # 'n_features': 5, # OHLCV # 'output_size': output_size, # 'model_dir': 'NN/models/saved', # 'data_dir': 'NN/data' # } # # Initialize the orchestrator # logger.info(f"Initializing Neural Network Orchestrator with config: {config}") # nn_orchestrator = NeuralNetworkOrchestrator(config) # # Load the model # model_loaded = nn_orchestrator.load_model() # if not model_loaded: # logger.warning("Failed to load neural network model. Using untrained model.") # return model_loaded # except Exception as e: # logger.error(f"Error setting up neural network: {str(e)}") # NN_ENABLED = False # return False # def start_nn_inference_thread(interval_seconds): # """Start a background thread to periodically run inference with the neural network""" # global nn_inference_thread # if not NN_ENABLED or nn_orchestrator is None: # logger.warning("Cannot start inference thread - Neural Network not enabled or initialized") # return False # def inference_worker(): # """Worker function for the inference thread""" # model_type = os.environ.get('NN_MODEL_TYPE', 'cnn') # timeframe = os.environ.get('NN_TIMEFRAME', '1h') # logger.info(f"Starting neural network inference thread with {interval_seconds}s interval") # logger.info(f"Using model type: {model_type}, timeframe: {timeframe}") # # Wait a bit for charts to initialize # time.sleep(5) # # Track active charts # active_charts = [] # while True: # try: # # Find active charts if we don't have them yet # if not active_charts and 'charts' in globals(): # active_charts = globals()['charts'] # logger.info(f"Found {len(active_charts)} active charts for NN signals") # # Run inference # result = nn_orchestrator.run_inference_pipeline( # model_type=model_type, # timeframe=timeframe # ) # if result: # # Log the result # logger.info(f"Neural network inference result: {result}") # # Add signal to charts # if active_charts: # try: # if 'action' in result: # action = result['action'] # timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00')) # # Get probability if available # probability = None # if 'probability' in result: # probability = result['probability'] # elif 'probabilities' in result: # probability = result['probabilities'].get(action, None) # # Add signal to each chart # for chart in active_charts: # if hasattr(chart, 'add_nn_signal'): # chart.add_nn_signal(action, timestamp, probability) # except Exception as e: # logger.error(f"Error adding NN signal to chart: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # # Sleep for the interval # time.sleep(interval_seconds) # except Exception as e: # logger.error(f"Error in inference thread: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # time.sleep(5) # Wait a bit before retrying # # Create and start the thread # nn_inference_thread = threading.Thread(target=inference_worker, daemon=True) # nn_inference_thread.start() # return True # # Try to get local timezone, default to Sofia/EET if not available # try: # local_timezone = tzlocal.get_localzone() # # Get timezone name safely # try: # tz_name = str(local_timezone) # # Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone # if hasattr(local_timezone, 'zone'): # tz_name = local_timezone.zone # elif hasattr(local_timezone, 'key'): # tz_name = local_timezone.key # else: # tz_name = str(local_timezone) # except: # tz_name = "Local" # logger.info(f"Detected local timezone: {local_timezone} ({tz_name})") # except Exception as e: # logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET") # local_timezone = pytz.timezone('Europe/Sofia') # tz_name = "Europe/Sofia" # def convert_to_local_time(timestamp): # """Convert timestamp to local timezone""" # try: # if isinstance(timestamp, pd.Timestamp): # dt = timestamp.to_pydatetime() # elif isinstance(timestamp, np.datetime64): # dt = pd.Timestamp(timestamp).to_pydatetime() # elif isinstance(timestamp, str): # dt = pd.to_datetime(timestamp).to_pydatetime() # else: # dt = timestamp # # If datetime is naive (no timezone), assume it's UTC # if dt.tzinfo is None: # dt = dt.replace(tzinfo=pytz.UTC) # # Convert to local timezone # local_dt = dt.astimezone(local_timezone) # return local_dt # except Exception as e: # logger.error(f"Error converting timestamp to local time: {str(e)}") # return timestamp # # Initialize TimescaleDB handler - only once, at module level # timescaledb_handler = TimescaleDBHandler() if TIMESCALEDB_ENABLED else None # class TickStorage: # def __init__(self, symbol, timeframes=None, use_timescaledb=False): # """Initialize the tick storage for a specific symbol""" # self.symbol = symbol # self.timeframes = timeframes or ["1s", "5m", "15m", "1h", "4h", "1d"] # self.ticks = [] # self.candles = {tf: [] for tf in self.timeframes} # self.current_candle = {tf: None for tf in self.timeframes} # self.last_candle_timestamp = {tf: None for tf in self.timeframes} # self.cache_dir = os.path.join(os.getcwd(), "cache", symbol.replace("/", "")) # self.cache_path = os.path.join(self.cache_dir, f"{symbol.replace('/', '')}_ticks.json") # Add missing cache_path # self.use_timescaledb = use_timescaledb # self.max_ticks = 10000 # Maximum number of ticks to store in memory # # Create cache directory if it doesn't exist # os.makedirs(self.cache_dir, exist_ok=True) # logger.info(f"Creating new tick storage for {symbol} with timeframes {self.timeframes}") # logger.info(f"Cache directory: {self.cache_dir}") # logger.info(f"Cache file: {self.cache_path}") # if use_timescaledb: # print(f"TickStorage: TimescaleDB integration is ENABLED for {symbol}") # else: # logger.info(f"TickStorage: TimescaleDB integration is DISABLED for {symbol}") # def _save_to_cache(self): # """Save ticks to a cache file""" # try: # # Only save the latest 5000 ticks to avoid giant files # ticks_to_save = self.ticks[-5000:] if len(self.ticks) > 5000 else self.ticks # # Convert pandas Timestamps to ISO strings for JSON serialization # serializable_ticks = [] # for tick in ticks_to_save: # serializable_tick = tick.copy() # if isinstance(tick['timestamp'], pd.Timestamp): # serializable_tick['timestamp'] = tick['timestamp'].isoformat() # elif hasattr(tick['timestamp'], 'isoformat'): # serializable_tick['timestamp'] = tick['timestamp'].isoformat() # else: # # Keep as is if it's already a string or number # serializable_tick['timestamp'] = tick['timestamp'] # serializable_ticks.append(serializable_tick) # with open(self.cache_path, 'w') as f: # json.dump(serializable_ticks, f) # logger.debug(f"Saved {len(serializable_ticks)} ticks to cache") # except Exception as e: # logger.error(f"Error saving ticks to cache: {e}") # def _load_from_cache(self): # """Load ticks from cache if available""" # if os.path.exists(self.cache_path): # try: # # Check if the cache file is recent (< 10 minutes old) # cache_age = time.time() - os.path.getmtime(self.cache_path) # if cache_age > 600: # 10 minutes in seconds # logger.warning(f"Cache file is {cache_age:.1f} seconds old (>10 min). Not using it.") # return False # with open(self.cache_path, 'r') as f: # cached_ticks = json.load(f) # if cached_ticks: # # Convert ISO strings back to pandas Timestamps # processed_ticks = [] # for tick in cached_ticks: # processed_tick = tick.copy() # if isinstance(tick['timestamp'], str): # try: # processed_tick['timestamp'] = pd.Timestamp(tick['timestamp']) # except: # # If parsing fails, use current time # processed_tick['timestamp'] = pd.Timestamp.now() # else: # # Convert to pandas Timestamp if it's a number (milliseconds) # processed_tick['timestamp'] = pd.Timestamp(tick['timestamp'], unit='ms') # processed_ticks.append(processed_tick) # self.ticks = processed_ticks # logger.info(f"Loaded {len(cached_ticks)} ticks from cache") # return True # except Exception as e: # logger.error(f"Error loading ticks from cache: {e}") # return False # def add_tick(self, tick=None, price=None, volume=None, timestamp=None): # """ # Add a tick to the storage and update candles for all timeframes # Args: # tick (dict, optional): A tick object containing price, quantity and timestamp # price (float, optional): Price of the tick (used in older interface) # volume (float, optional): Volume of the tick (used in older interface) # timestamp (datetime, optional): Timestamp of the tick (used in older interface) # """ # # Handle tick as a dict or separate parameters for backward compatibility # if tick is not None and isinstance(tick, dict): # # Using the new interface with a tick object # price = tick['price'] # volume = tick.get('quantity', 0) # timestamp = tick['timestamp'] # elif price is not None: # # Using the old interface with separate parameters # # Convert datetime to pd.Timestamp if needed # if timestamp is not None and not isinstance(timestamp, pd.Timestamp): # timestamp = pd.Timestamp(timestamp) # else: # logger.error("Invalid tick: must provide either a tick dict or price") # return # # Ensure timestamp is a pandas Timestamp # if not isinstance(timestamp, pd.Timestamp): # if isinstance(timestamp, (int, float)): # # Assume it's milliseconds # timestamp = pd.Timestamp(timestamp, unit='ms') # else: # # Try to parse as string or datetime # timestamp = pd.Timestamp(timestamp) # # Create tick object with consistent pandas Timestamp # tick_obj = { # 'price': float(price), # 'quantity': float(volume) if volume is not None else 0.0, # 'timestamp': timestamp # } # # Add to the list of ticks # self.ticks.append(tick_obj) # # Limit the number of ticks to avoid memory issues # if len(self.ticks) > self.max_ticks: # self.ticks = self.ticks[-self.max_ticks:] # # Update candles for all timeframes # for timeframe in self.timeframes: # if timeframe == "1s": # self._update_1s_candle(tick_obj) # else: # self._update_candles_for_timeframe(timeframe, tick_obj) # # Cache to disk periodically # self._try_cache_ticks() # def _update_1s_candle(self, tick): # """Update the 1-second candle with the new tick""" # # Get timestamp for the start of the current second # tick_timestamp = tick['timestamp'] # candle_timestamp = pd.Timestamp(int(tick_timestamp.timestamp() // 1 * 1_000_000_000)) # # Check if we need to create a new candle # if self.current_candle["1s"] is None or self.current_candle["1s"]["timestamp"] != candle_timestamp: # # If we have a current candle, finalize it and add to candles list # if self.current_candle["1s"] is not None: # # Add the completed candle to the list # self.candles["1s"].append(self.current_candle["1s"]) # # Limit the number of stored candles to prevent memory issues # if len(self.candles["1s"]) > 3600: # Keep last hour of 1s candles # self.candles["1s"] = self.candles["1s"][-3600:] # # Store in TimescaleDB if enabled # if self.use_timescaledb: # timescaledb_handler.upsert_candle( # self.symbol, "1s", self.current_candle["1s"] # ) # # Log completed candle for debugging # logger.debug(f"Completed 1s candle: {self.current_candle['1s']['timestamp']} - Close: {self.current_candle['1s']['close']}") # # Create a new candle # self.current_candle["1s"] = { # "timestamp": candle_timestamp, # "open": float(tick["price"]), # "high": float(tick["price"]), # "low": float(tick["price"]), # "close": float(tick["price"]), # "volume": float(tick["quantity"]) if "quantity" in tick else 0.0 # } # # Update last candle timestamp # self.last_candle_timestamp["1s"] = candle_timestamp # logger.debug(f"Created new 1s candle at {candle_timestamp}") # else: # # Update the current candle # current = self.current_candle["1s"] # price = float(tick["price"]) # # Update high and low # if price > current["high"]: # current["high"] = price # if price < current["low"]: # current["low"] = price # # Update close price and add volume # current["close"] = price # current["volume"] += float(tick["quantity"]) if "quantity" in tick else 0.0 # def _update_candles_for_timeframe(self, timeframe, tick): # """Update candles for a specific timeframe""" # # Skip 1s as it's handled separately # if timeframe == "1s": # return # # Convert timeframe to seconds # timeframe_seconds = self._timeframe_to_seconds(timeframe) # # Get the timestamp truncated to the timeframe interval # # e.g., for a 5m candle, the timestamp should be truncated to the nearest 5-minute mark # # Convert timestamp to datetime if it's not already # tick_timestamp = tick['timestamp'] # if isinstance(tick_timestamp, pd.Timestamp): # ts = tick_timestamp # else: # ts = pd.Timestamp(tick_timestamp) # # Truncate timestamp to nearest timeframe interval # timestamp = pd.Timestamp( # int(ts.timestamp() // timeframe_seconds * timeframe_seconds * 1_000_000_000) # ) # # Get the current candle for this timeframe # current_candle = self.current_candle[timeframe] # # If we have no current candle or the timestamp is different (new candle) # if current_candle is None or current_candle['timestamp'] != timestamp: # # If we have a current candle, add it to the candles list # if current_candle: # self.candles[timeframe].append(current_candle) # # Save to TimescaleDB if enabled # if self.use_timescaledb: # timescaledb_handler.upsert_candle(self.symbol, timeframe, current_candle) # # Create a new candle # current_candle = { # 'timestamp': timestamp, # 'open': tick['price'], # 'high': tick['price'], # 'low': tick['price'], # 'close': tick['price'], # 'volume': tick.get('quantity', 0) # } # # Update current candle # self.current_candle[timeframe] = current_candle # self.last_candle_timestamp[timeframe] = timestamp # else: # # Update existing candle # current_candle['high'] = max(current_candle['high'], tick['price']) # current_candle['low'] = min(current_candle['low'], tick['price']) # current_candle['close'] = tick['price'] # current_candle['volume'] += tick.get('quantity', 0) # # Limit the number of candles to avoid memory issues # max_candles = 1000 # if len(self.candles[timeframe]) > max_candles: # self.candles[timeframe] = self.candles[timeframe][-max_candles:] # def _timeframe_to_seconds(self, timeframe): # """Convert a timeframe string (e.g., '1m', '1h') to seconds""" # if timeframe == "1s": # return 1 # try: # # Extract the number and unit # match = re.match(r'(\d+)([smhdw])', timeframe) # if not match: # return None # num, unit = match.groups() # num = int(num) # # Convert to seconds # if unit == 's': # return num # elif unit == 'm': # return num * 60 # elif unit == 'h': # return num * 3600 # elif unit == 'd': # return num * 86400 # elif unit == 'w': # return num * 604800 # return None # except: # return None # def get_candles(self, timeframe, limit=None): # """Get candles for a given timeframe""" # if timeframe in self.candles: # candles = self.candles[timeframe] # # Add the current candle if it exists and isn't None # if timeframe in self.current_candle and self.current_candle[timeframe] is not None: # # Make a copy of the current candle # current_candle_copy = self.current_candle[timeframe].copy() # # Check if the current candle is newer than the last candle in the list # if not candles or current_candle_copy["timestamp"] > candles[-1]["timestamp"]: # candles = candles + [current_candle_copy] # # Apply limit if provided # if limit and len(candles) > limit: # return candles[-limit:] # return candles # return [] # def get_last_price(self): # """Get the last known price""" # if self.ticks: # return float(self.ticks[-1]["price"]) # return None # def load_historical_data(self, symbol, limit=1000): # """Load historical data for all timeframes""" # logger.info(f"Starting historical data load for {symbol} with limit {limit}") # # Clear existing data # self.ticks = [] # self.candles = {tf: [] for tf in self.timeframes} # self.current_candle = {tf: None for tf in self.timeframes} # # Try to load ticks from cache first # logger.info("Attempting to load from cache...") # cache_loaded = self._load_from_cache() # if cache_loaded: # logger.info("Successfully loaded data from cache") # else: # logger.info("No valid cache data found") # # Check if we have TimescaleDB enabled # if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: # logger.info("Attempting to fetch historical data from TimescaleDB") # loaded_from_db = False # # Load candles for each timeframe from TimescaleDB # for tf in self.timeframes: # try: # candles = timescaledb_handler.fetch_candles(symbol, tf, limit) # if candles: # self.candles[tf] = candles # loaded_from_db = True # logger.info(f"Loaded {len(candles)} {tf} candles from TimescaleDB") # else: # logger.info(f"No {tf} candles found in TimescaleDB") # except Exception as e: # logger.error(f"Error loading {tf} candles from TimescaleDB: {str(e)}") # if loaded_from_db: # logger.info("Successfully loaded historical data from TimescaleDB") # return True # else: # logger.info("TimescaleDB not available or disabled") # # If no TimescaleDB data and no cache, we need to get from Binance API # if not cache_loaded: # logger.info("Loading data from Binance API...") # # Create a BinanceHistoricalData instance # historical_data = BinanceHistoricalData() # # Load data for each timeframe # success_count = 0 # for tf in self.timeframes: # if tf != "1s": # Skip 1s since we'll generate it from ticks # try: # logger.info(f"Fetching {tf} candles for {symbol}...") # df = historical_data.get_historical_candles(symbol, self._timeframe_to_seconds(tf), limit) # if df is not None and not df.empty: # logger.info(f"Loaded {len(df)} {tf} candles from Binance API") # # Convert to our candle format and store # candles = [] # for _, row in df.iterrows(): # candle = { # 'timestamp': row['timestamp'], # 'open': row['open'], # 'high': row['high'], # 'low': row['low'], # 'close': row['close'], # 'volume': row['volume'] # } # candles.append(candle) # # Also save to TimescaleDB if enabled # if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: # timescaledb_handler.upsert_candle(symbol, tf, candle) # self.candles[tf] = candles # success_count += 1 # else: # logger.warning(f"No data returned for {tf} candles") # except Exception as e: # logger.error(f"Error loading {tf} candles: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # logger.info(f"Successfully loaded {success_count} timeframes from Binance API") # # For 1s, load from API if possible or compute from first available timeframe # if "1s" in self.timeframes: # logger.info("Loading 1s candles...") # # Try to get 1s data from Binance # try: # df_1s = historical_data.get_historical_candles(symbol, 1, 300) # Only need recent 1s data # if df_1s is not None and not df_1s.empty: # logger.info(f"Loaded {len(df_1s)} recent 1s candles from Binance API") # # Convert to our candle format and store # candles_1s = [] # for _, row in df_1s.iterrows(): # candle = { # 'timestamp': row['timestamp'], # 'open': row['open'], # 'high': row['high'], # 'low': row['low'], # 'close': row['close'], # 'volume': row['volume'] # } # candles_1s.append(candle) # # Also save to TimescaleDB if enabled # if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: # timescaledb_handler.upsert_candle(symbol, "1s", candle) # self.candles["1s"] = candles_1s # except Exception as e: # logger.error(f"Error loading 1s candles: {str(e)}") # # If 1s data not available or failed to load, approximate from 1m data # if not self.candles.get("1s"): # logger.info("1s data not available, trying to approximate from 1m data...") # # If 1s data not available, we can approximate from 1m data # if "1m" in self.timeframes and self.candles["1m"]: # # For demonstration, just use the 1m candles as placeholders for 1s # # In a real implementation, you might want more sophisticated interpolation # logger.info("Using 1m candles as placeholders for 1s timeframe") # self.candles["1s"] = [] # # Take the most recent 5 minutes of 1m candles # recent_1m = self.candles["1m"][-5:] if self.candles["1m"] else [] # logger.info(f"Creating 1s approximations from {len(recent_1m)} 1m candles") # for candle_1m in recent_1m: # # Create 60 1s candles for each 1m candle # ts_base = candle_1m["timestamp"].timestamp() # for i in range(60): # # Create a 1s candle with interpolated values # candle_1s = { # 'timestamp': pd.Timestamp(int((ts_base + i) * 1_000_000_000)), # 'open': candle_1m['open'], # 'high': candle_1m['high'], # 'low': candle_1m['low'], # 'close': candle_1m['close'], # 'volume': candle_1m['volume'] / 60.0 # Distribute volume evenly # } # self.candles["1s"].append(candle_1s) # # Also save to TimescaleDB if enabled # if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled: # timescaledb_handler.upsert_candle(symbol, "1s", candle_1s) # logger.info(f"Created {len(self.candles['1s'])} approximated 1s candles") # else: # logger.warning("No 1m data available to approximate 1s candles from") # # Set the last candle of each timeframe as the current candle # for tf in self.timeframes: # if self.candles[tf]: # self.current_candle[tf] = self.candles[tf][-1].copy() # self.last_candle_timestamp[tf] = self.current_candle[tf]["timestamp"] # logger.debug(f"Set current candle for {tf}: {self.current_candle[tf]['timestamp']}") # # If we loaded ticks from cache, rebuild candles # if cache_loaded: # logger.info("Rebuilding candles from cached ticks...") # # Clear candles # self.candles = {tf: [] for tf in self.timeframes} # self.current_candle = {tf: None for tf in self.timeframes} # # Process each tick to rebuild the candles # for tick in self.ticks: # for tf in self.timeframes: # if tf == "1s": # self._update_1s_candle(tick) # else: # self._update_candles_for_timeframe(tf, tick) # logger.info("Finished rebuilding candles from ticks") # # Log final results # for tf in self.timeframes: # count = len(self.candles[tf]) # logger.info(f"Final {tf} candle count: {count}") # has_data = cache_loaded or any(self.candles[tf] for tf in self.timeframes) # logger.info(f"Historical data loading completed. Has data: {has_data}") # return has_data # def _try_cache_ticks(self): # """Try to save ticks to cache periodically""" # # Only save to cache every 100 ticks to avoid excessive disk I/O # if len(self.ticks) % 100 == 0: # try: # self._save_to_cache() # except Exception as e: # # Don't spam logs with cache errors, just log once every 1000 ticks # if len(self.ticks) % 1000 == 0: # logger.warning(f"Cache save failed at {len(self.ticks)} ticks: {str(e)}") # pass # Continue even if cache fails # class Position: # """Represents a trading position""" # def __init__(self, action, entry_price, amount, timestamp=None, trade_id=None, fee_rate=0.0002): # self.action = action # self.entry_price = entry_price # self.amount = amount # self.entry_timestamp = timestamp or datetime.now() # self.exit_timestamp = None # self.exit_price = None # self.pnl = None # self.is_open = True # self.trade_id = trade_id or str(uuid.uuid4())[:8] # self.fee_rate = fee_rate # self.paid_fee = entry_price * amount * fee_rate # Calculate entry fee # def close(self, exit_price, exit_timestamp=None): # """Close an open position""" # self.exit_price = exit_price # self.exit_timestamp = exit_timestamp or datetime.now() # self.is_open = False # # Calculate P&L # if self.action == "BUY": # price_diff = self.exit_price - self.entry_price # # Calculate fee for exit trade # exit_fee = exit_price * self.amount * self.fee_rate # self.paid_fee += exit_fee # Add exit fee to total paid fee # self.pnl = (price_diff * self.amount) - self.paid_fee # else: # SELL # price_diff = self.entry_price - self.exit_price # # Calculate fee for exit trade # exit_fee = exit_price * self.amount * self.fee_rate # self.paid_fee += exit_fee # Add exit fee to total paid fee # self.pnl = (price_diff * self.amount) - self.paid_fee # return self.pnl # class RealTimeChart: # def __init__(self, app=None, symbol='BTCUSDT', timeframe='1m', standalone=True, chart_title=None, # run_signal_interpreter=False, debug_mode=False, historical_candles=None, # extended_hours=False, enable_logging=True, agent=None, trading_env=None, # max_memory_usage=90, memory_check_interval=10, tick_update_interval=0.5, # chart_update_interval=1, performance_monitoring=False, show_volume=True, # show_indicators=True, custom_trades=None, port=8050, height=900, width=1200, # positions_callback=None, allow_synthetic_data=True, tick_storage=None): # """Initialize a real-time chart with support for multiple indicators and backtesting.""" # # Store parameters # self.symbol = symbol # self.timeframe = timeframe # self.debug_mode = debug_mode # self.standalone = standalone # self.chart_title = chart_title or f"{symbol} Real-Time Chart" # self.extended_hours = extended_hours # self.enable_logging = enable_logging # self.run_signal_interpreter = run_signal_interpreter # self.historical_candles = historical_candles # self.performance_monitoring = performance_monitoring # self.max_memory_usage = max_memory_usage # self.memory_check_interval = memory_check_interval # self.tick_update_interval = tick_update_interval # self.chart_update_interval = chart_update_interval # self.show_volume = show_volume # self.show_indicators = show_indicators # self.custom_trades = custom_trades # self.port = port # self.height = height # self.width = width # self.positions_callback = positions_callback # self.allow_synthetic_data = allow_synthetic_data # # Initialize interval store # self.interval_store = {'interval': 1} # Default to 1s timeframe # # Initialize trading components # self.agent = agent # self.trading_env = trading_env # # Initialize button styles for timeframe selection # self.button_style = { # 'background': '#343a40', # 'color': 'white', # 'border': 'none', # 'padding': '10px 20px', # 'margin': '0 5px', # 'borderRadius': '4px', # 'cursor': 'pointer' # } # self.active_button_style = { # 'background': '#007bff', # 'color': 'white', # 'border': 'none', # 'padding': '10px 20px', # 'margin': '0 5px', # 'borderRadius': '4px', # 'cursor': 'pointer', # 'fontWeight': 'bold' # } # # Initialize color schemes # self.colors = { # 'background': '#1e1e1e', # 'text': '#ffffff', # 'grid': '#333333', # 'candle_up': '#26a69a', # 'candle_down': '#ef5350', # 'volume_up': 'rgba(38, 166, 154, 0.3)', # 'volume_down': 'rgba(239, 83, 80, 0.3)', # 'ma': '#ffeb3b', # 'ema': '#29b6f6', # 'bollinger_bands': '#ff9800', # 'trades_buy': '#00e676', # 'trades_sell': '#ff1744' # } # # Initialize data storage # self.all_trades = [] # Store trades # self.positions = [] # Store open positions # self.latest_price = 0.0 # self.latest_volume = 0.0 # self.latest_timestamp = datetime.now() # self.current_balance = 100.0 # Starting balance # self.accumulative_pnl = 0.0 # Accumulated profit/loss # # Initialize trade rate counter # self.trade_count = 0 # self.start_time = time.time() # self.trades_per_second = 0 # self.trades_per_minute = 0 # self.trades_per_hour = 0 # # Initialize trade rate tracking variables # self.trade_times = [] # Store timestamps of recent trades for rate calculation # self.last_trade_rate_calculation = datetime.now() # self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0} # # Initialize interactive components # self.app = app # # Create a new app if not provided # if self.app is None and standalone: # self.app = dash.Dash( # __name__, # external_stylesheets=[dbc.themes.DARKLY], # suppress_callback_exceptions=True # ) # # Initialize tick storage if not provided # if tick_storage is None: # # Check if TimescaleDB integration is enabled # use_timescaledb = TIMESCALEDB_ENABLED and timescaledb_handler is not None # # Create a new tick storage # self.tick_storage = TickStorage( # symbol=symbol, # timeframes=["1s", "1m", "5m", "15m", "1h", "4h", "1d"], # use_timescaledb=use_timescaledb # ) # # Load historical data immediately for cold start # logger.info(f"Loading historical data for {symbol} during chart initialization") # try: # data_loaded = self.tick_storage.load_historical_data(symbol) # if data_loaded: # logger.info(f"Successfully loaded historical data for {symbol}") # # Log what we have # for tf in ["1s", "1m", "5m", "15m", "1h"]: # candle_count = len(self.tick_storage.candles.get(tf, [])) # logger.info(f" {tf}: {candle_count} candles") # else: # logger.warning(f"Failed to load historical data for {symbol}") # except Exception as e: # logger.error(f"Error loading historical data during initialization: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # else: # self.tick_storage = tick_storage # # Create layout and callbacks if app is provided # if self.app is not None: # # Create the layout # self.app.layout = self._create_layout() # # Register callbacks # self._setup_callbacks() # # Log initialization # if self.enable_logging: # logger.info(f"RealTimeChart initialized: {self.symbol} ({self.timeframe}) ") # def _create_layout(self): # return html.Div([ # # Header section with title and current price # html.Div([ # html.H1(f"{self.symbol} Real-Time Chart", className="display-4"), # # Current price ticker # html.Div([ # html.H4("Current Price:", style={"display": "inline-block", "marginRight": "10px"}), # html.H3(id="current-price", style={"display": "inline-block", "color": "#17a2b8"}), # html.Div([ # html.H5("Balance:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), # html.H5(id="current-balance", style={"display": "inline-block", "color": "#28a745"}), # ], style={"display": "inline-block", "marginLeft": "40px"}), # html.Div([ # html.H5("Accumulated PnL:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), # html.H5(id="accumulated-pnl", style={"display": "inline-block", "color": "#ffc107"}), # ], style={"display": "inline-block", "marginLeft": "40px"}), # # Add trade rate display # html.Div([ # html.H5("Trade Rate:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}), # html.Span([ # html.Span(id="trade-rate-second", style={"color": "#ff7f0e"}), # html.Span("/s, "), # html.Span(id="trade-rate-minute", style={"color": "#ff7f0e"}), # html.Span("/m, "), # html.Span(id="trade-rate-hour", style={"color": "#ff7f0e"}), # html.Span("/h") # ], style={"display": "inline-block"}), # ], style={"display": "inline-block", "marginLeft": "40px"}), # ], style={"textAlign": "center", "margin": "20px 0"}), # ], style={"textAlign": "center", "marginBottom": "20px"}), # # Add interval component for periodic updates # dcc.Interval( # id='interval-component', # interval=500, # in milliseconds # n_intervals=0 # ), # # Add timeframe selection buttons # html.Div([ # html.Button('1s', id='btn-1s', n_clicks=0, style=self.active_button_style), # html.Button('5s', id='btn-5s', n_clicks=0, style=self.button_style), # html.Button('15s', id='btn-15s', n_clicks=0, style=self.button_style), # html.Button('1m', id='btn-1m', n_clicks=0, style=self.button_style), # html.Button('5m', id='btn-5m', n_clicks=0, style=self.button_style), # html.Button('15m', id='btn-15m', n_clicks=0, style=self.button_style), # html.Button('1h', id='btn-1h', n_clicks=0, style=self.button_style), # ], style={"textAlign": "center", "marginBottom": "20px"}), # # Store for the selected timeframe # dcc.Store(id='interval-store', data={'interval': 1}), # # Chart content (without wrapper div to avoid callback issues) # dcc.Graph(id='live-chart', style={"height": "600px"}), # dcc.Graph(id='secondary-charts', style={"height": "500px"}), # html.Div(id='positions-list') # ]) # def _create_chart_and_controls(self): # """Create the chart and controls for the dashboard.""" # try: # # Get selected interval from the dashboard (default to 1s if not available) # interval_seconds = 1 # if hasattr(self, 'interval_store') and self.interval_store: # interval_seconds = self.interval_store.get('interval', 1) # # Create chart components # chart_div = html.Div([ # # Update chart with data for the selected interval # dcc.Graph( # id='live-chart', # figure=self._update_main_chart(interval_seconds), # style={"height": "600px"} # ), # # Update secondary charts # dcc.Graph( # id='secondary-charts', # figure=self._update_secondary_charts(), # style={"height": "500px"} # ), # # Update positions list # html.Div( # id='positions-list', # children=self._get_position_list_rows() # ) # ]) # return chart_div # except Exception as e: # logger.error(f"Error creating chart and controls: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # # Return a simple error message as fallback # return html.Div(f"Error loading chart: {str(e)}", style={"color": "red", "padding": "20px"}) # def _setup_callbacks(self): # """Setup Dash callbacks for the real-time chart""" # if self.app is None: # return # try: # # Update chart with all components based on interval # @self.app.callback( # [ # Output('live-chart', 'figure'), # Output('secondary-charts', 'figure'), # Output('positions-list', 'children'), # Output('current-price', 'children'), # Output('current-balance', 'children'), # Output('accumulated-pnl', 'children'), # Output('trade-rate-second', 'children'), # Output('trade-rate-minute', 'children'), # Output('trade-rate-hour', 'children') # ], # [ # Input('interval-component', 'n_intervals'), # Input('interval-store', 'data') # ] # ) # def update_all(n_intervals, interval_data): # """Update all chart components""" # try: # # Get selected interval # interval_seconds = interval_data.get('interval', 1) if interval_data else 1 # # Update main chart - limit data for performance # main_chart = self._update_main_chart(interval_seconds) # # Update secondary charts - limit data for performance # secondary_charts = self._update_secondary_charts() # # Update positions list # positions_list = self._get_position_list_rows() # # Update current price and balance # current_price = f"${self.latest_price:.2f}" if self.latest_price else "Error" # current_balance = f"${self.current_balance:.2f}" # accumulated_pnl = f"${self.accumulative_pnl:.2f}" # # Calculate trade rates # trade_rate = self._calculate_trade_rate() # trade_rate_second = f"{trade_rate['per_second']:.1f}" # trade_rate_minute = f"{trade_rate['per_minute']:.1f}" # trade_rate_hour = f"{trade_rate['per_hour']:.1f}" # return (main_chart, secondary_charts, positions_list, # current_price, current_balance, accumulated_pnl, # trade_rate_second, trade_rate_minute, trade_rate_hour) # except Exception as e: # logger.error(f"Error in update_all callback: {str(e)}") # # Return empty/error states # import plotly.graph_objects as go # empty_fig = go.Figure() # empty_fig.add_annotation(text="Chart Loading...", xref="paper", yref="paper", x=0.5, y=0.5) # return (empty_fig, empty_fig, [], "Loading...", "$0.00", "$0.00", "0.0", "0.0", "0.0") # # Timeframe selection callbacks # @self.app.callback( # [Output('interval-store', 'data'), # Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'), # Output('btn-1m', 'style'), Output('btn-5m', 'style'), Output('btn-15m', 'style'), # Output('btn-1h', 'style')], # [Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'), # Input('btn-1m', 'n_clicks'), Input('btn-5m', 'n_clicks'), Input('btn-15m', 'n_clicks'), # Input('btn-1h', 'n_clicks')] # ) # def update_timeframe(n1s, n5s, n15s, n1m, n5m, n15m, n1h): # """Update selected timeframe based on button clicks""" # ctx = dash.callback_context # if not ctx.triggered: # # Default to 1s # styles = [self.active_button_style] + [self.button_style] * 6 # return {'interval': 1}, *styles # button_id = ctx.triggered[0]['prop_id'].split('.')[0] # # Map button to interval seconds # interval_map = { # 'btn-1s': 1, 'btn-5s': 5, 'btn-15s': 15, # 'btn-1m': 60, 'btn-5m': 300, 'btn-15m': 900, 'btn-1h': 3600 # } # selected_interval = interval_map.get(button_id, 1) # # Create styles - active for selected, normal for others # button_names = ['btn-1s', 'btn-5s', 'btn-15s', 'btn-1m', 'btn-5m', 'btn-15m', 'btn-1h'] # styles = [] # for name in button_names: # if name == button_id: # styles.append(self.active_button_style) # else: # styles.append(self.button_style) # return {'interval': selected_interval}, *styles # logger.info("Dash callbacks registered successfully") # except Exception as e: # logger.error(f"Error setting up callbacks: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # def _calculate_trade_rate(self): # """Calculate trading rate per second, minute, and hour""" # try: # now = datetime.now() # current_time = time.time() # # Filter trades within different time windows # trades_last_second = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 1) # trades_last_minute = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 60) # trades_last_hour = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 3600) # return { # "per_second": trades_last_second, # "per_minute": trades_last_minute, # "per_hour": trades_last_hour # } # except Exception as e: # logger.warning(f"Error calculating trade rate: {str(e)}") # return {"per_second": 0.0, "per_minute": 0.0, "per_hour": 0.0} # def _update_secondary_charts(self): # """Create secondary charts for volume and indicators""" # try: # # Create subplots for secondary charts # fig = make_subplots( # rows=2, cols=1, # subplot_titles=['Volume', 'Technical Indicators'], # shared_xaxes=True, # vertical_spacing=0.1, # row_heights=[0.3, 0.7] # ) # # Get latest candles (limit for performance) # candles = self.tick_storage.candles.get("1m", [])[-100:] # Last 100 candles for performance # if not candles: # fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5) # fig.update_layout( # title="Secondary Charts", # template="plotly_dark", # height=400 # ) # return fig # # Extract data # timestamps = [candle['timestamp'] for candle in candles] # volumes = [candle['volume'] for candle in candles] # closes = [candle['close'] for candle in candles] # # Volume chart # colors = ['#26a69a' if i == 0 or closes[i] >= closes[i-1] else '#ef5350' for i in range(len(closes))] # fig.add_trace( # go.Bar( # x=timestamps, # y=volumes, # name='Volume', # marker_color=colors, # showlegend=False # ), # row=1, col=1 # ) # # Technical indicators # if len(closes) >= 20: # # Simple moving average # sma_20 = pd.Series(closes).rolling(window=20).mean() # fig.add_trace( # go.Scatter( # x=timestamps, # y=sma_20, # name='SMA 20', # line=dict(color='#ffeb3b', width=2) # ), # row=2, col=1 # ) # # RSI calculation # if len(closes) >= 14: # rsi = self._calculate_rsi(closes, 14) # fig.add_trace( # go.Scatter( # x=timestamps, # y=rsi, # name='RSI', # line=dict(color='#29b6f6', width=2), # yaxis='y3' # ), # row=2, col=1 # ) # # Update layout # fig.update_layout( # title="Volume & Technical Indicators", # template="plotly_dark", # height=400, # showlegend=True, # legend=dict(x=0, y=1, bgcolor='rgba(0,0,0,0)') # ) # # Update y-axes # fig.update_yaxes(title="Volume", row=1, col=1) # fig.update_yaxes(title="Price", row=2, col=1) # return fig # except Exception as e: # logger.error(f"Error creating secondary charts: {str(e)}") # # Return empty figure on error # fig = go.Figure() # fig.add_annotation(text=f"Error: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5) # fig.update_layout(template="plotly_dark", height=400) # return fig # def _calculate_rsi(self, prices, period=14): # """Calculate RSI indicator""" # try: # prices = pd.Series(prices) # delta = prices.diff() # gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() # loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() # rs = gain / loss # rsi = 100 - (100 / (1 + rs)) # return rsi.fillna(50).tolist() # Fill NaN with neutral RSI value # except Exception: # return [50] * len(prices) # Return neutral RSI on error # def _get_position_list_rows(self): # """Get list of current positions for display""" # try: # if not self.positions: # return [html.Div("No open positions", style={"color": "#888", "padding": "10px"})] # rows = [] # for i, position in enumerate(self.positions): # try: # # Calculate current PnL # current_pnl = (self.latest_price - position.entry_price) * position.amount # if position.action.upper() == 'SELL': # current_pnl = -current_pnl # # Create position row # row = html.Div([ # html.Span(f"#{i+1}: ", style={"fontWeight": "bold"}), # html.Span(f"{position.action.upper()} ", # style={"color": "#00e676" if position.action.upper() == "BUY" else "#ff1744"}), # html.Span(f"{position.amount:.4f} @ ${position.entry_price:.2f} "), # html.Span(f"PnL: ${current_pnl:.2f}", # style={"color": "#00e676" if current_pnl >= 0 else "#ff1744"}) # ], style={"padding": "5px", "borderBottom": "1px solid #333"}) # rows.append(row) # except Exception as e: # logger.warning(f"Error formatting position {i}: {str(e)}") # return rows # except Exception as e: # logger.error(f"Error getting position list: {str(e)}") # return [html.Div("Error loading positions", style={"color": "red", "padding": "10px"})] # def add_trade(self, action, price, amount, timestamp=None, trade_id=None): # """Add a trade to the chart and update tracking""" # try: # if timestamp is None: # timestamp = datetime.now() # # Create trade record # trade = { # 'id': trade_id or str(uuid.uuid4()), # 'action': action.upper(), # 'price': float(price), # 'amount': float(amount), # 'timestamp': timestamp, # 'value': float(price) * float(amount) # } # # Add to trades list # self.all_trades.append(trade) # # Update trade rate tracking # self.trade_times.append(time.time()) # # Keep only last hour of trade times # cutoff_time = time.time() - 3600 # self.trade_times = [t for t in self.trade_times if t > cutoff_time] # # Update positions # if action.upper() in ['BUY', 'SELL']: # position = Position( # action=action.upper(), # entry_price=float(price), # amount=float(amount), # timestamp=timestamp, # trade_id=trade['id'] # ) # self.positions.append(position) # # Update balance and PnL # if action.upper() == 'BUY': # self.current_balance -= trade['value'] # else: # SELL # self.current_balance += trade['value'] # # Calculate PnL for this trade # if len(self.all_trades) > 1: # # Simple PnL calculation - more sophisticated logic could be added # last_opposite_trades = [t for t in reversed(self.all_trades[:-1]) # if t['action'] != action.upper()] # if last_opposite_trades: # last_trade = last_opposite_trades[0] # if action.upper() == 'SELL': # pnl = (float(price) - last_trade['price']) * float(amount) # else: # BUY # pnl = (last_trade['price'] - float(price)) * float(amount) # self.accumulative_pnl += pnl # logger.info(f"Added trade: {action.upper()} {amount} @ ${price:.2f}") # except Exception as e: # logger.error(f"Error adding trade: {str(e)}") # def _get_interval_key(self, interval_seconds): # """Convert interval seconds to timeframe key""" # if interval_seconds <= 1: # return "1s" # elif interval_seconds <= 5: # return "5s" if "5s" in self.tick_storage.timeframes else "1s" # elif interval_seconds <= 15: # return "15s" if "15s" in self.tick_storage.timeframes else "1m" # elif interval_seconds <= 60: # return "1m" # elif interval_seconds <= 300: # return "5m" # elif interval_seconds <= 900: # return "15m" # elif interval_seconds <= 3600: # return "1h" # elif interval_seconds <= 14400: # return "4h" # else: # return "1d" # def _update_main_chart(self, interval_seconds): # """Update the main chart for the specified interval""" # try: # # Convert interval seconds to timeframe key # interval_key = self._get_interval_key(interval_seconds) # # Get candles for this timeframe (limit to last 100 for performance) # candles = self.tick_storage.candles.get(interval_key, [])[-100:] # if not candles: # logger.warning(f"No candle data available for {interval_key}") # # Return empty figure with a message # fig = go.Figure() # fig.add_annotation( # text=f"No data available for {interval_key}", # xref="paper", yref="paper", # x=0.5, y=0.5, # showarrow=False, # font=dict(size=16, color="white") # ) # fig.update_layout( # title=f"{self.symbol} - {interval_key} Chart", # template="plotly_dark", # height=600 # ) # return fig # # Extract data from candles # timestamps = [candle['timestamp'] for candle in candles] # opens = [candle['open'] for candle in candles] # highs = [candle['high'] for candle in candles] # lows = [candle['low'] for candle in candles] # closes = [candle['close'] for candle in candles] # volumes = [candle['volume'] for candle in candles] # # Create candlestick chart # fig = go.Figure() # # Add candlestick trace # fig.add_trace(go.Candlestick( # x=timestamps, # open=opens, # high=highs, # low=lows, # close=closes, # name="Price", # increasing_line_color='#26a69a', # decreasing_line_color='#ef5350', # increasing_fillcolor='#26a69a', # decreasing_fillcolor='#ef5350' # )) # # Add trade markers if we have trades # if self.all_trades: # # Filter trades to match the current timeframe window # start_time = timestamps[0] if timestamps else datetime.now() - timedelta(hours=1) # end_time = timestamps[-1] if timestamps else datetime.now() # filtered_trades = [ # trade for trade in self.all_trades # if start_time <= trade['timestamp'] <= end_time # ] # if filtered_trades: # buy_trades = [t for t in filtered_trades if t['action'] == 'BUY'] # sell_trades = [t for t in filtered_trades if t['action'] == 'SELL'] # # Add BUY markers # if buy_trades: # fig.add_trace(go.Scatter( # x=[t['timestamp'] for t in buy_trades], # y=[t['price'] for t in buy_trades], # mode='markers', # marker=dict( # symbol='triangle-up', # size=12, # color='#00e676', # line=dict(color='white', width=1) # ), # name='BUY', # text=[f"BUY {t['amount']:.4f} @ ${t['price']:.2f}" for t in buy_trades], # hovertemplate='%{text}
Time: %{x}' # )) # # Add SELL markers # if sell_trades: # fig.add_trace(go.Scatter( # x=[t['timestamp'] for t in sell_trades], # y=[t['price'] for t in sell_trades], # mode='markers', # marker=dict( # symbol='triangle-down', # size=12, # color='#ff1744', # line=dict(color='white', width=1) # ), # name='SELL', # text=[f"SELL {t['amount']:.4f} @ ${t['price']:.2f}" for t in sell_trades], # hovertemplate='%{text}
Time: %{x}' # )) # # Add moving averages if we have enough data # if len(closes) >= 20: # # 20-period SMA # sma_20 = pd.Series(closes).rolling(window=20).mean() # fig.add_trace(go.Scatter( # x=timestamps, # y=sma_20, # name='SMA 20', # line=dict(color='#ffeb3b', width=1), # opacity=0.7 # )) # if len(closes) >= 50: # # 50-period SMA # sma_50 = pd.Series(closes).rolling(window=50).mean() # fig.add_trace(go.Scatter( # x=timestamps, # y=sma_50, # name='SMA 50', # line=dict(color='#ff9800', width=1), # opacity=0.7 # )) # # Update layout # fig.update_layout( # title=f"{self.symbol} - {interval_key} Chart ({len(candles)} candles)", # template="plotly_dark", # height=600, # xaxis_title="Time", # yaxis_title="Price ($)", # legend=dict( # yanchor="top", # y=0.99, # xanchor="left", # x=0.01, # bgcolor="rgba(0,0,0,0.5)" # ), # hovermode='x unified', # dragmode='pan' # ) # # Remove range slider for better performance # fig.update_layout(xaxis_rangeslider_visible=False) # # Update the latest price # if closes: # self.latest_price = closes[-1] # self.latest_timestamp = timestamps[-1] # return fig # except Exception as e: # logger.error(f"Error updating main chart: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # # Return error figure # fig = go.Figure() # fig.add_annotation( # text=f"Chart Error: {str(e)}", # xref="paper", yref="paper", # x=0.5, y=0.5, # showarrow=False, # font=dict(size=16, color="red") # ) # fig.update_layout( # title="Chart Error", # template="plotly_dark", # height=600 # ) # return fig # def set_trading_env(self, trading_env): # """Set the trading environment to monitor for new trades""" # self.trading_env = trading_env # if hasattr(trading_env, 'add_trade_callback'): # trading_env.add_trade_callback(self.add_trade) # logger.info("Trading environment integrated with chart") # def set_agent(self, agent): # """Set the agent to monitor for trading decisions""" # self.agent = agent # logger.info("Agent integrated with chart") # def update_from_env(self, env_data): # """Update chart data from trading environment""" # try: # if 'latest_price' in env_data: # self.latest_price = env_data['latest_price'] # if 'balance' in env_data: # self.current_balance = env_data['balance'] # if 'pnl' in env_data: # self.accumulative_pnl = env_data['pnl'] # if 'trades' in env_data: # # Add any new trades # for trade in env_data['trades']: # if trade not in self.all_trades: # self.add_trade( # action=trade.get('action', 'HOLD'), # price=trade.get('price', self.latest_price), # amount=trade.get('amount', 0.1), # timestamp=trade.get('timestamp', datetime.now()), # trade_id=trade.get('id') # ) # except Exception as e: # logger.error(f"Error updating from environment: {str(e)}") # def get_latest_data(self): # """Get the latest data for external systems""" # return { # 'latest_price': self.latest_price, # 'latest_volume': self.latest_volume, # 'latest_timestamp': self.latest_timestamp, # 'current_balance': self.current_balance, # 'accumulative_pnl': self.accumulative_pnl, # 'positions': len(self.positions), # 'trade_count': len(self.all_trades), # 'trade_rate': self._calculate_trade_rate() # } # async def start_websocket(self): # """Start the websocket connection for real-time data""" # try: # logger.info("Starting websocket connection for real-time data") # # Start the websocket data fetching # websocket_url = "wss://stream.binance.com:9443/ws/ethusdt@ticker" # async def websocket_handler(): # """Handle websocket connection and data updates""" # try: # async with websockets.connect(websocket_url) as websocket: # logger.info(f"WebSocket connected for {self.symbol}") # message_count = 0 # async for message in websocket: # try: # data = json.loads(message) # # Update tick storage with new price data # tick = { # 'price': float(data['c']), # Current price # 'volume': float(data['v']), # Volume # 'timestamp': pd.Timestamp.now() # } # self.tick_storage.add_tick(tick) # # Update chart's latest price and volume # self.latest_price = float(data['c']) # self.latest_volume = float(data['v']) # self.latest_timestamp = pd.Timestamp.now() # message_count += 1 # # Log periodic updates # if message_count % 100 == 0: # logger.info(f"Received message #{message_count}") # logger.info(f"Processed {message_count} ticks, current price: ${self.latest_price:.2f}") # # Log candle counts # candle_count = len(self.tick_storage.candles.get("1s", [])) # logger.info(f"Current 1s candles count: {candle_count}") # except json.JSONDecodeError as e: # logger.warning(f"Failed to parse websocket message: {str(e)}") # except Exception as e: # logger.error(f"Error processing websocket message: {str(e)}") # except websockets.exceptions.ConnectionClosed: # logger.warning("WebSocket connection closed") # except Exception as e: # logger.error(f"WebSocket error: {str(e)}") # # Start the websocket handler in the background # await websocket_handler() # except Exception as e: # logger.error(f"Error starting websocket: {str(e)}") # import traceback # logger.error(traceback.format_exc()) # def run(self, host='127.0.0.1', port=8050, debug=False): # """Run the Dash app""" # try: # if self.app is None: # logger.error("No Dash app instance available") # return # logger.info("="*60) # logger.info("🔗 ACCESS WEB UI AT: http://localhost:8050/") # logger.info("📊 View live trading data and charts in your browser") # logger.info("="*60) # # Run the app - FIXED: Updated for newer Dash versions # self.app.run( # host=host, # port=port, # debug=debug, # use_reloader=False, # Disable reloader to avoid conflicts # threaded=True # Enable threading for better performance # ) # except Exception as e: # logger.error(f"Error running Dash app: {str(e)}") # import traceback # logger.error(traceback.format_exc())