import asyncio
import json
import logging
# Fix PIL import issue that causes plotly JSON serialization errors
import os
os.environ['MPLBACKEND'] = 'Agg' # Use non-interactive backend
try:
# Try to fix PIL import issue
import PIL.Image
# Disable PIL in plotly to prevent circular import issues
import plotly.io as pio
pio.kaleido.scope.default_format = "png"
except ImportError:
pass
except Exception:
# Suppress any PIL-related errors during import
pass
from typing import Dict, List, Optional, Tuple, Union
import websockets
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import dash
from dash import html, dcc
from dash.dependencies import Input, Output
import pandas as pd
import numpy as np
from collections import deque
import time
from threading import Thread
import requests
import os
from datetime import datetime, timedelta
import pytz
import tzlocal
import threading
import random
import dash_bootstrap_components as dbc
import uuid
import ta
from sklearn.preprocessing import MinMaxScaler
import re
import psutil
import gc
import websocket
# Import psycopg2 with error handling
try:
import psycopg2
PSYCOPG2_AVAILABLE = True
except ImportError:
PSYCOPG2_AVAILABLE = False
psycopg2 = None
# TimescaleDB configuration from environment variables
TIMESCALEDB_ENABLED = os.environ.get('TIMESCALEDB_ENABLED', '1') == '1' and PSYCOPG2_AVAILABLE
TIMESCALEDB_HOST = os.environ.get('TIMESCALEDB_HOST', '192.168.0.10')
TIMESCALEDB_PORT = int(os.environ.get('TIMESCALEDB_PORT', '5432'))
TIMESCALEDB_USER = os.environ.get('TIMESCALEDB_USER', 'postgres')
TIMESCALEDB_PASSWORD = os.environ.get('TIMESCALEDB_PASSWORD', 'timescaledbpass')
TIMESCALEDB_DB = os.environ.get('TIMESCALEDB_DB', 'candles')
class TimescaleDBHandler:
"""Handler for TimescaleDB operations for candle storage and retrieval"""
def __init__(self):
"""Initialize TimescaleDB connection if enabled"""
self.enabled = TIMESCALEDB_ENABLED
self.conn = None
if not self.enabled:
if not PSYCOPG2_AVAILABLE:
print("psycopg2 module not available. TimescaleDB integration disabled.")
return
try:
# Connect to TimescaleDB
self.conn = psycopg2.connect(
host=TIMESCALEDB_HOST,
port=TIMESCALEDB_PORT,
user=TIMESCALEDB_USER,
password=TIMESCALEDB_PASSWORD,
dbname=TIMESCALEDB_DB
)
print(f"Connected to TimescaleDB at {TIMESCALEDB_HOST}:{TIMESCALEDB_PORT}")
# Ensure the candles table exists
self._ensure_table()
print("TimescaleDB integration initialized successfully")
except Exception as e:
print(f"Error connecting to TimescaleDB: {str(e)}")
self.enabled = False
self.conn = None
def _ensure_table(self):
"""Ensure the candles table exists with TimescaleDB hypertable"""
if not self.conn:
return
try:
with self.conn.cursor() as cur:
# Create the candles table if it doesn't exist
cur.execute('''
CREATE TABLE IF NOT EXISTS candles (
symbol TEXT,
interval TEXT,
timestamp TIMESTAMPTZ,
open DOUBLE PRECISION,
high DOUBLE PRECISION,
low DOUBLE PRECISION,
close DOUBLE PRECISION,
volume DOUBLE PRECISION,
PRIMARY KEY (symbol, interval, timestamp)
);
''')
# Check if the table is already a hypertable
cur.execute('''
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.hypertables
WHERE hypertable_name = 'candles'
);
''')
is_hypertable = cur.fetchone()[0]
# Convert to hypertable if not already done
if not is_hypertable:
cur.execute('''
SELECT create_hypertable('candles', 'timestamp',
if_not_exists => TRUE,
migrate_data => TRUE
);
''')
self.conn.commit()
print("TimescaleDB table structure verified")
except Exception as e:
print(f"Error setting up TimescaleDB tables: {str(e)}")
self.enabled = False
def upsert_candle(self, symbol, interval, candle):
"""Insert or update a candle in TimescaleDB"""
if not self.enabled or not self.conn:
return False
try:
with self.conn.cursor() as cur:
cur.execute('''
INSERT INTO candles (
symbol, interval, timestamp,
open, high, low, close, volume
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (symbol, interval, timestamp)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume
''', (
symbol, interval, candle['timestamp'],
candle['open'], candle['high'], candle['low'],
candle['close'], candle['volume']
))
self.conn.commit()
return True
except Exception as e:
print(f"Error upserting candle to TimescaleDB: {str(e)}")
# Try to reconnect on error
try:
self.conn = psycopg2.connect(
host=TIMESCALEDB_HOST,
port=TIMESCALEDB_PORT,
user=TIMESCALEDB_USER,
password=TIMESCALEDB_PASSWORD,
dbname=TIMESCALEDB_DB
)
except:
pass
return False
def fetch_candles(self, symbol, interval, limit=1000):
"""Fetch candles from TimescaleDB"""
if not self.enabled or not self.conn:
return []
try:
with self.conn.cursor() as cur:
cur.execute('''
SELECT timestamp, open, high, low, close, volume
FROM candles
WHERE symbol = %s AND interval = %s
ORDER BY timestamp DESC
LIMIT %s
''', (symbol, interval, limit))
rows = cur.fetchall()
# Convert to list of dictionaries (ordered from oldest to newest)
candles = []
for row in reversed(rows): # Reverse to get oldest first
candle = {
'timestamp': row[0],
'open': row[1],
'high': row[2],
'low': row[3],
'close': row[4],
'volume': row[5]
}
candles.append(candle)
return candles
except Exception as e:
print(f"Error fetching candles from TimescaleDB: {str(e)}")
# Try to reconnect on error
try:
self.conn = psycopg2.connect(
host=TIMESCALEDB_HOST,
port=TIMESCALEDB_PORT,
user=TIMESCALEDB_USER,
password=TIMESCALEDB_PASSWORD,
dbname=TIMESCALEDB_DB
)
except:
pass
return []
class BinanceHistoricalData:
"""
Class for fetching historical price data from Binance.
"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3"
self.cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache')
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
# Timestamp of last data update
self.last_update = None
def get_historical_candles(self, symbol, interval_seconds=3600, limit=1000):
"""
Fetch historical candles from Binance API.
Args:
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
interval_seconds (int): Timeframe in seconds (e.g., 3600 for 1h)
limit (int): Number of candles to fetch
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
# Convert interval_seconds to Binance interval format
interval_map = {
1: "1s",
60: "1m",
300: "5m",
900: "15m",
1800: "30m",
3600: "1h",
14400: "4h",
86400: "1d"
}
interval = interval_map.get(interval_seconds, "1h")
# Format symbol for Binance API (remove slash and make uppercase)
formatted_symbol = symbol.replace("/", "").upper()
# Check if we have cached data first
cache_file = self._get_cache_filename(formatted_symbol, interval)
cached_data = self._load_from_cache(formatted_symbol, interval)
# If we have cached data that's recent enough, use it
if cached_data is not None and len(cached_data) >= limit:
cache_age_minutes = (datetime.now() - self.last_update).total_seconds() / 60 if self.last_update else 60
if cache_age_minutes < 15: # Only use cache if it's less than 15 minutes old
logger.info(f"Using cached historical data for {symbol} ({interval})")
return cached_data
try:
# Build URL for klines endpoint
url = f"{self.base_url}/klines"
params = {
"symbol": formatted_symbol,
"interval": interval,
"limit": limit
}
# Make the request
response = requests.get(url, params=params)
response.raise_for_status()
# Parse the response
data = response.json()
# Create dataframe
df = pd.DataFrame(data, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert timestamp to datetime
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
# Convert price columns to float
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Sort by timestamp
df = df.sort_values("timestamp")
# Save to cache for future use
self._save_to_cache(df, formatted_symbol, interval)
self.last_update = datetime.now()
logger.info(f"Fetched {len(df)} candles for {symbol} ({interval})")
return df
except Exception as e:
logger.error(f"Error fetching historical data from Binance: {str(e)}")
# Return cached data if we have it, even if it's not enough
if cached_data is not None:
logger.warning(f"Using cached data instead (may be incomplete)")
return cached_data
# Return empty dataframe on error
return pd.DataFrame()
def _get_cache_filename(self, symbol, interval):
"""Get filename for cache file"""
return os.path.join(self.cache_dir, f"{symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol, interval):
"""Load candles from cache file"""
try:
cache_file = self._get_cache_filename(symbol, interval)
if os.path.exists(cache_file):
# For 1s interval, check if the cache is recent (less than 10 minutes old)
if interval == "1s" or interval == 1:
file_mod_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
time_diff = (datetime.now() - file_mod_time).total_seconds() / 60
if time_diff > 10:
logger.info("1s cache is older than 10 minutes, skipping load")
return None
logger.info(f"Using recent 1s cache (age: {time_diff:.1f} minutes)")
df = pd.read_csv(cache_file)
df["timestamp"] = pd.to_datetime(df["timestamp"])
logger.info(f"Loaded {len(df)} candles from cache: {cache_file}")
return df
except Exception as e:
logger.error(f"Error loading cached data: {str(e)}")
return None
def _save_to_cache(self, df, symbol, interval):
"""Save candles to cache file"""
try:
cache_file = self._get_cache_filename(symbol, interval)
df.to_csv(cache_file, index=False)
logger.info(f"Saved {len(df)} candles to cache: {cache_file}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_recent_trades(self, symbol, limit=1000):
"""Get recent trades for a symbol"""
formatted_symbol = symbol.replace("/", "")
try:
url = f"{self.base_url}/trades"
params = {
"symbol": formatted_symbol,
"limit": limit
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Create dataframe
df = pd.DataFrame(data)
df["time"] = pd.to_datetime(df["time"], unit="ms")
df["price"] = df["price"].astype(float)
df["qty"] = df["qty"].astype(float)
return df
except Exception as e:
logger.error(f"Error fetching recent trades: {str(e)}")
return pd.DataFrame()
class MultiTimeframeDataInterface:
"""
Enhanced Data Interface supporting:
- Multiple trading pairs
- Multiple timeframes per pair (1s, 1m, 1h, 1d + custom)
- Technical indicators
- Cross-timeframe normalization
- Real-time data updates
"""
def __init__(self, symbol=None, timeframes=None, data_dir="data"):
"""
Initialize the data interface.
Args:
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d'])
data_dir (str): Directory to store/load datasets
"""
self.symbol = symbol
self.timeframes = timeframes or ['1h', '4h', '1d']
self.data_dir = data_dir
self.scalers = {} # Store scalers for each timeframe
# Initialize the historical data fetcher
self.historical_data = BinanceHistoricalData()
# Create data directory if it doesn't exist
os.makedirs(self.data_dir, exist_ok=True)
# Initialize empty dataframes for each timeframe
self.dataframes = {tf: None for tf in self.timeframes}
# Store timestamps of last updates per timeframe
self.last_updates = {tf: None for tf in self.timeframes}
# Timeframe mapping (string to seconds)
self.timeframe_to_seconds = {
'1s': 1,
'1m': 60,
'5m': 300,
'15m': 900,
'30m': 1800,
'1h': 3600,
'4h': 14400,
'1d': 86400
}
logger.info(f"MultiTimeframeDataInterface initialized for {symbol} with timeframes {timeframes}")
def get_data(self, timeframe='1h', n_candles=1000, refresh=False, add_indicators=True):
"""
Fetch historical price data for a given timeframe with optional indicators.
Args:
timeframe (str): Timeframe to fetch data for
n_candles (int): Number of candles to fetch
refresh (bool): Force refresh of the data
add_indicators (bool): Whether to add technical indicators
Returns:
pd.DataFrame: DataFrame with OHLCV data and indicators
"""
# Check if we need to refresh
current_time = datetime.now()
if (not refresh and
self.dataframes[timeframe] is not None and
self.last_updates[timeframe] is not None and
(current_time - self.last_updates[timeframe]).total_seconds() < 60):
logger.info(f"Using cached data for {self.symbol} {timeframe}")
return self.dataframes[timeframe]
interval_seconds = self.timeframe_to_seconds.get(timeframe, 3600)
# Fetch data
df = self.historical_data.get_historical_candles(
symbol=self.symbol,
interval_seconds=interval_seconds,
limit=n_candles
)
if df is None or df.empty:
logger.error(f"No data available for {self.symbol} {timeframe}")
return None
# Add indicators if requested
if add_indicators:
df = self.add_indicators(df)
# Store in cache
self.dataframes[timeframe] = df
self.last_updates[timeframe] = current_time
logger.info(f"Fetched and processed {len(df)} candles for {self.symbol} {timeframe}")
return df
def add_indicators(self, df):
"""
Add comprehensive technical indicators to the dataframe.
Args:
df (pd.DataFrame): DataFrame with OHLCV data
Returns:
pd.DataFrame: DataFrame with added technical indicators
"""
# Make a copy to avoid modifying the original
df_copy = df.copy()
# Basic price indicators
df_copy['returns'] = df_copy['close'].pct_change()
df_copy['log_returns'] = np.log(df_copy['close'] / df_copy['close'].shift(1))
# Moving Averages
df_copy['sma_7'] = ta.trend.sma_indicator(df_copy['close'], window=7)
df_copy['sma_25'] = ta.trend.sma_indicator(df_copy['close'], window=25)
df_copy['sma_99'] = ta.trend.sma_indicator(df_copy['close'], window=99)
df_copy['ema_9'] = ta.trend.ema_indicator(df_copy['close'], window=9)
df_copy['ema_21'] = ta.trend.ema_indicator(df_copy['close'], window=21)
# MACD
macd = ta.trend.MACD(df_copy['close'])
df_copy['macd'] = macd.macd()
df_copy['macd_signal'] = macd.macd_signal()
df_copy['macd_diff'] = macd.macd_diff()
# RSI
df_copy['rsi'] = ta.momentum.rsi(df_copy['close'], window=14)
# Bollinger Bands
bollinger = ta.volatility.BollingerBands(df_copy['close'])
df_copy['bb_high'] = bollinger.bollinger_hband()
df_copy['bb_low'] = bollinger.bollinger_lband()
df_copy['bb_pct'] = bollinger.bollinger_pband()
# Stochastic Oscillator
stoch = ta.momentum.StochasticOscillator(df_copy['high'], df_copy['low'], df_copy['close'])
df_copy['stoch_k'] = stoch.stoch()
df_copy['stoch_d'] = stoch.stoch_signal()
# ATR - Average True Range
df_copy['atr'] = ta.volatility.average_true_range(df_copy['high'], df_copy['low'], df_copy['close'], window=14)
# Money Flow Index
df_copy['mfi'] = ta.volume.money_flow_index(df_copy['high'], df_copy['low'], df_copy['close'], df_copy['volume'], window=14)
# OBV - On-Balance Volume
df_copy['obv'] = ta.volume.on_balance_volume(df_copy['close'], df_copy['volume'])
# Ichimoku Cloud
ichimoku = ta.trend.IchimokuIndicator(df_copy['high'], df_copy['low'])
df_copy['ichimoku_a'] = ichimoku.ichimoku_a()
df_copy['ichimoku_b'] = ichimoku.ichimoku_b()
df_copy['ichimoku_base'] = ichimoku.ichimoku_base_line()
df_copy['ichimoku_conv'] = ichimoku.ichimoku_conversion_line()
# ADX - Average Directional Index
adx = ta.trend.ADXIndicator(df_copy['high'], df_copy['low'], df_copy['close'])
df_copy['adx'] = adx.adx()
df_copy['adx_pos'] = adx.adx_pos()
df_copy['adx_neg'] = adx.adx_neg()
# VWAP - Volume Weighted Average Price (intraday)
# Custom calculation since TA library doesn't include VWAP
df_copy['vwap'] = (df_copy['volume'] * (df_copy['high'] + df_copy['low'] + df_copy['close']) / 3).cumsum() / df_copy['volume'].cumsum()
# Fill NaN values
df_copy = df_copy.fillna(method='bfill').fillna(0)
return df_copy
def get_multi_timeframe_data(self, timeframes=None, n_candles=1000, refresh=False, add_indicators=True):
"""
Fetch data for multiple timeframes.
Args:
timeframes (list): List of timeframes to fetch
n_candles (int): Number of candles to fetch for each timeframe
refresh (bool): Force refresh of the data
add_indicators (bool): Whether to add technical indicators
Returns:
dict: Dictionary of dataframes indexed by timeframe
"""
if timeframes is None:
timeframes = self.timeframes
result = {}
for tf in timeframes:
# For higher timeframes, we need fewer candles
tf_candles = n_candles
if tf == '4h':
tf_candles = max(250, n_candles // 4)
elif tf == '1d':
tf_candles = max(100, n_candles // 24)
df = self.get_data(timeframe=tf, n_candles=tf_candles, refresh=refresh, add_indicators=add_indicators)
if df is not None and not df.empty:
result[tf] = df
return result
def prepare_training_data(self, window_size=20, train_ratio=0.8, refresh=False):
"""
Prepare training data from multiple timeframes.
Args:
window_size (int): Size of the sliding window
train_ratio (float): Ratio of data to use for training
refresh (bool): Whether to refresh the data
Returns:
tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices)
"""
# Get data for all timeframes
data_dict = self.get_multi_timeframe_data(refresh=refresh)
if not data_dict:
logger.error("Failed to fetch data for any timeframe")
return None, None, None, None, None, None
# Align all dataframes by timestamp
all_dfs = list(data_dict.values())
min_date = max([df['timestamp'].min() for df in all_dfs])
max_date = min([df['timestamp'].max() for df in all_dfs])
aligned_dfs = {}
for tf, df in data_dict.items():
aligned_df = df[(df['timestamp'] >= min_date) & (df['timestamp'] <= max_date)]
aligned_dfs[tf] = aligned_df
# Choose the lowest timeframe as the reference for time alignment
reference_tf = min(self.timeframes, key=lambda x: self.timeframe_to_seconds.get(x, 3600))
reference_df = aligned_dfs[reference_tf]
# Create sliding windows for each timeframe
X_dict = {}
for tf, df in aligned_dfs.items():
# Drop timestamp and create numeric features
features = df.drop('timestamp', axis=1).values
# Ensure the feature array is 3D: [samples, window, features]
X = np.array([features[i:i+window_size] for i in range(len(features)-window_size)])
X_dict[tf] = X
# Create target labels based on future price movements
reference_prices = reference_df['close'].values
future_prices = reference_prices[window_size:]
current_prices = reference_prices[window_size-1:-1]
# Calculate returns
returns = (future_prices - current_prices) / current_prices
# Create labels: 0=SELL, 1=HOLD, 2=BUY
threshold = 0.0005 # 0.05% threshold
y = np.zeros(len(returns), dtype=int)
y[returns > threshold] = 2 # BUY
y[returns < -threshold] = 0 # SELL
y[(returns >= -threshold) & (returns <= threshold)] = 1 # HOLD
# Split into training and validation sets
split_idx = int(len(y) * train_ratio)
X_train_dict = {tf: X[:split_idx] for tf, X in X_dict.items()}
X_val_dict = {tf: X[split_idx:] for tf, X in X_dict.items()}
y_train = y[:split_idx]
y_val = y[split_idx:]
train_prices = reference_prices[window_size-1:window_size-1+split_idx]
val_prices = reference_prices[window_size-1+split_idx:window_size-1+len(y)]
logger.info(f"Prepared training data - Train: {len(y_train)}, Val: {len(y_val)}")
return X_train_dict, y_train, X_val_dict, y_val, train_prices, val_prices
def normalize_data(self, data_dict, fit=True):
"""
Normalize data across all timeframes.
Args:
data_dict (dict): Dictionary of data arrays by timeframe
fit (bool): Whether to fit new scalers or use existing ones
Returns:
dict: Dictionary of normalized data arrays
"""
result = {}
for tf, data in data_dict.items():
# For 3D data [samples, window, features]
if len(data.shape) == 3:
samples, window, features = data.shape
reshaped = data.reshape(-1, features)
if fit or tf not in self.scalers:
self.scalers[tf] = MinMaxScaler()
normalized = self.scalers[tf].fit_transform(reshaped)
else:
normalized = self.scalers[tf].transform(reshaped)
result[tf] = normalized.reshape(samples, window, features)
# For 2D data [samples, features]
elif len(data.shape) == 2:
if fit or tf not in self.scalers:
self.scalers[tf] = MinMaxScaler()
result[tf] = self.scalers[tf].fit_transform(data)
else:
result[tf] = self.scalers[tf].transform(data)
return result
def get_realtime_features(self, timeframes=None, window_size=20):
"""
Get the most recent data for real-time prediction.
Args:
timeframes (list): List of timeframes to use
window_size (int): Size of the sliding window
Returns:
dict: Dictionary of feature arrays for the latest window
"""
if timeframes is None:
timeframes = self.timeframes
# Get fresh data
data_dict = self.get_multi_timeframe_data(timeframes=timeframes, refresh=True)
result = {}
for tf, df in data_dict.items():
if len(df) < window_size:
logger.warning(f"Not enough data for {tf} (need {window_size}, got {len(df)})")
continue
# Get the latest window
latest_data = df.tail(window_size).drop('timestamp', axis=1).values
# Add extra dimension to match model input shape [1, window_size, features]
result[tf] = latest_data.reshape(1, window_size, -1)
# Apply normalization using existing scalers
if self.scalers:
result = self.normalize_data(result, fit=False)
return result
def calculate_pnl(self, predictions, prices, position_size=1.0, fee_rate=0.0002):
"""
Calculate PnL and win rate from predictions.
Args:
predictions (np.ndarray): Array of predicted actions (0=SELL, 1=HOLD, 2=BUY)
prices (np.ndarray): Array of prices
position_size (float): Size of each position
fee_rate (float): Trading fee rate (default: 0.0002 for 0.02% per trade)
Returns:
tuple: (total_pnl, win_rate, trades)
"""
if len(predictions) < 2 or len(prices) < 2:
return 0.0, 0.0, []
# Ensure arrays are the same length
min_len = min(len(predictions), len(prices)-1)
actions = predictions[:min_len]
pnl = 0.0
wins = 0
trades = []
for i in range(min_len):
current_price = prices[i]
next_price = prices[i+1]
action = actions[i]
# Skip HOLD actions
if action == 1:
continue
price_change = (next_price - current_price) / current_price
if action == 2: # BUY
# Calculate raw PnL
raw_pnl = price_change * position_size
# Calculate fees (entry and exit)
entry_fee = position_size * fee_rate
exit_fee = position_size * (1 + price_change) * fee_rate
total_fees = entry_fee + exit_fee
# Net PnL after fees
trade_pnl = raw_pnl - total_fees
trade_type = 'BUY'
is_win = trade_pnl > 0
elif action == 0: # SELL
# Calculate raw PnL
raw_pnl = -price_change * position_size
# Calculate fees (entry and exit)
entry_fee = position_size * fee_rate
exit_fee = position_size * (1 - price_change) * fee_rate
total_fees = entry_fee + exit_fee
# Net PnL after fees
trade_pnl = raw_pnl - total_fees
trade_type = 'SELL'
is_win = trade_pnl > 0
else:
continue
pnl += trade_pnl
wins += int(is_win)
trades.append({
'type': trade_type,
'entry': float(current_price), # Ensure serializable
'exit': float(next_price),
'raw_pnl': float(raw_pnl),
'fees': float(total_fees),
'pnl': float(trade_pnl),
'win': bool(is_win),
'timestamp': datetime.now().isoformat() # Add timestamp
})
win_rate = wins / len(trades) if trades else 0.0
return float(pnl), float(win_rate), trades
# Configure logging with more detailed format
logging.basicConfig(
level=logging.INFO, # Changed to DEBUG for more detailed logs
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('realtime_chart.log')
]
)
logger = logging.getLogger(__name__)
# Neural Network integration (conditional import)
NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1'
nn_orchestrator = None
nn_inference_thread = None
if NN_ENABLED:
try:
import sys
# Add project root to sys.path if needed
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
from NN.main import NeuralNetworkOrchestrator
logger.info("Neural Network module enabled")
except ImportError as e:
logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}")
NN_ENABLED = False
# NN utility functions
def setup_neural_network():
"""Initialize the neural network components if enabled"""
global nn_orchestrator, NN_ENABLED
if not NN_ENABLED:
return False
try:
# Get configuration from environment variables or use defaults
symbol = os.environ.get('NN_SYMBOL', 'ETH/USDT')
timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',')
output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL
# Configure the orchestrator
config = {
'symbol': symbol,
'timeframes': timeframes,
'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')),
'n_features': 5, # OHLCV
'output_size': output_size,
'model_dir': 'NN/models/saved',
'data_dir': 'NN/data'
}
# Initialize the orchestrator
logger.info(f"Initializing Neural Network Orchestrator with config: {config}")
nn_orchestrator = NeuralNetworkOrchestrator(config)
# Load the model
model_loaded = nn_orchestrator.load_model()
if not model_loaded:
logger.warning("Failed to load neural network model. Using untrained model.")
return model_loaded
except Exception as e:
logger.error(f"Error setting up neural network: {str(e)}")
NN_ENABLED = False
return False
def start_nn_inference_thread(interval_seconds):
"""Start a background thread to periodically run inference with the neural network"""
global nn_inference_thread
if not NN_ENABLED or nn_orchestrator is None:
logger.warning("Cannot start inference thread - Neural Network not enabled or initialized")
return False
def inference_worker():
"""Worker function for the inference thread"""
model_type = os.environ.get('NN_MODEL_TYPE', 'cnn')
timeframe = os.environ.get('NN_TIMEFRAME', '1h')
logger.info(f"Starting neural network inference thread with {interval_seconds}s interval")
logger.info(f"Using model type: {model_type}, timeframe: {timeframe}")
# Wait a bit for charts to initialize
time.sleep(5)
# Track active charts
active_charts = []
while True:
try:
# Find active charts if we don't have them yet
if not active_charts and 'charts' in globals():
active_charts = globals()['charts']
logger.info(f"Found {len(active_charts)} active charts for NN signals")
# Run inference
result = nn_orchestrator.run_inference_pipeline(
model_type=model_type,
timeframe=timeframe
)
if result:
# Log the result
logger.info(f"Neural network inference result: {result}")
# Add signal to charts
if active_charts:
try:
if 'action' in result:
action = result['action']
timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00'))
# Get probability if available
probability = None
if 'probability' in result:
probability = result['probability']
elif 'probabilities' in result:
probability = result['probabilities'].get(action, None)
# Add signal to each chart
for chart in active_charts:
if hasattr(chart, 'add_nn_signal'):
chart.add_nn_signal(action, timestamp, probability)
except Exception as e:
logger.error(f"Error adding NN signal to chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Sleep for the interval
time.sleep(interval_seconds)
except Exception as e:
logger.error(f"Error in inference thread: {str(e)}")
import traceback
logger.error(traceback.format_exc())
time.sleep(5) # Wait a bit before retrying
# Create and start the thread
nn_inference_thread = threading.Thread(target=inference_worker, daemon=True)
nn_inference_thread.start()
return True
# Try to get local timezone, default to Sofia/EET if not available
try:
local_timezone = tzlocal.get_localzone()
# Get timezone name safely
try:
tz_name = str(local_timezone)
# Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone
if hasattr(local_timezone, 'zone'):
tz_name = local_timezone.zone
elif hasattr(local_timezone, 'key'):
tz_name = local_timezone.key
else:
tz_name = str(local_timezone)
except:
tz_name = "Local"
logger.info(f"Detected local timezone: {local_timezone} ({tz_name})")
except Exception as e:
logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET")
local_timezone = pytz.timezone('Europe/Sofia')
tz_name = "Europe/Sofia"
def convert_to_local_time(timestamp):
"""Convert timestamp to local timezone"""
try:
if isinstance(timestamp, pd.Timestamp):
dt = timestamp.to_pydatetime()
elif isinstance(timestamp, np.datetime64):
dt = pd.Timestamp(timestamp).to_pydatetime()
elif isinstance(timestamp, str):
dt = pd.to_datetime(timestamp).to_pydatetime()
else:
dt = timestamp
# If datetime is naive (no timezone), assume it's UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=pytz.UTC)
# Convert to local timezone
local_dt = dt.astimezone(local_timezone)
return local_dt
except Exception as e:
logger.error(f"Error converting timestamp to local time: {str(e)}")
return timestamp
# Initialize TimescaleDB handler - only once, at module level
timescaledb_handler = TimescaleDBHandler() if TIMESCALEDB_ENABLED else None
class TickStorage:
def __init__(self, symbol, timeframes=None, use_timescaledb=False):
"""Initialize the tick storage for a specific symbol"""
self.symbol = symbol
self.timeframes = timeframes or ["1s", "5m", "15m", "1h", "4h", "1d"]
self.ticks = []
self.candles = {tf: [] for tf in self.timeframes}
self.current_candle = {tf: None for tf in self.timeframes}
self.last_candle_timestamp = {tf: None for tf in self.timeframes}
self.cache_dir = os.path.join(os.getcwd(), "cache", symbol.replace("/", ""))
self.cache_path = os.path.join(self.cache_dir, f"{symbol.replace('/', '')}_ticks.json") # Add missing cache_path
self.use_timescaledb = use_timescaledb
self.max_ticks = 10000 # Maximum number of ticks to store in memory
# Create cache directory if it doesn't exist
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Creating new tick storage for {symbol} with timeframes {self.timeframes}")
logger.info(f"Cache directory: {self.cache_dir}")
logger.info(f"Cache file: {self.cache_path}")
if use_timescaledb:
print(f"TickStorage: TimescaleDB integration is ENABLED for {symbol}")
else:
logger.info(f"TickStorage: TimescaleDB integration is DISABLED for {symbol}")
def _save_to_cache(self):
"""Save ticks to a cache file"""
try:
# Only save the latest 5000 ticks to avoid giant files
ticks_to_save = self.ticks[-5000:] if len(self.ticks) > 5000 else self.ticks
# Convert pandas Timestamps to ISO strings for JSON serialization
serializable_ticks = []
for tick in ticks_to_save:
serializable_tick = tick.copy()
if isinstance(tick['timestamp'], pd.Timestamp):
serializable_tick['timestamp'] = tick['timestamp'].isoformat()
elif hasattr(tick['timestamp'], 'isoformat'):
serializable_tick['timestamp'] = tick['timestamp'].isoformat()
else:
# Keep as is if it's already a string or number
serializable_tick['timestamp'] = tick['timestamp']
serializable_ticks.append(serializable_tick)
with open(self.cache_path, 'w') as f:
json.dump(serializable_ticks, f)
logger.debug(f"Saved {len(serializable_ticks)} ticks to cache")
except Exception as e:
logger.error(f"Error saving ticks to cache: {e}")
def _load_from_cache(self):
"""Load ticks from cache if available"""
if os.path.exists(self.cache_path):
try:
# Check if the cache file is recent (< 10 minutes old)
cache_age = time.time() - os.path.getmtime(self.cache_path)
if cache_age > 600: # 10 minutes in seconds
logger.warning(f"Cache file is {cache_age:.1f} seconds old (>10 min). Not using it.")
return False
with open(self.cache_path, 'r') as f:
cached_ticks = json.load(f)
if cached_ticks:
# Convert ISO strings back to pandas Timestamps
processed_ticks = []
for tick in cached_ticks:
processed_tick = tick.copy()
if isinstance(tick['timestamp'], str):
try:
processed_tick['timestamp'] = pd.Timestamp(tick['timestamp'])
except:
# If parsing fails, use current time
processed_tick['timestamp'] = pd.Timestamp.now()
else:
# Convert to pandas Timestamp if it's a number (milliseconds)
processed_tick['timestamp'] = pd.Timestamp(tick['timestamp'], unit='ms')
processed_ticks.append(processed_tick)
self.ticks = processed_ticks
logger.info(f"Loaded {len(cached_ticks)} ticks from cache")
return True
except Exception as e:
logger.error(f"Error loading ticks from cache: {e}")
return False
def add_tick(self, tick=None, price=None, volume=None, timestamp=None):
"""
Add a tick to the storage and update candles for all timeframes
Args:
tick (dict, optional): A tick object containing price, quantity and timestamp
price (float, optional): Price of the tick (used in older interface)
volume (float, optional): Volume of the tick (used in older interface)
timestamp (datetime, optional): Timestamp of the tick (used in older interface)
"""
# Handle tick as a dict or separate parameters for backward compatibility
if tick is not None and isinstance(tick, dict):
# Using the new interface with a tick object
price = tick['price']
volume = tick.get('quantity', 0)
timestamp = tick['timestamp']
elif price is not None:
# Using the old interface with separate parameters
# Convert datetime to pd.Timestamp if needed
if timestamp is not None and not isinstance(timestamp, pd.Timestamp):
timestamp = pd.Timestamp(timestamp)
else:
logger.error("Invalid tick: must provide either a tick dict or price")
return
# Ensure timestamp is a pandas Timestamp
if not isinstance(timestamp, pd.Timestamp):
if isinstance(timestamp, (int, float)):
# Assume it's milliseconds
timestamp = pd.Timestamp(timestamp, unit='ms')
else:
# Try to parse as string or datetime
timestamp = pd.Timestamp(timestamp)
# Create tick object with consistent pandas Timestamp
tick_obj = {
'price': float(price),
'quantity': float(volume) if volume is not None else 0.0,
'timestamp': timestamp
}
# Add to the list of ticks
self.ticks.append(tick_obj)
# Limit the number of ticks to avoid memory issues
if len(self.ticks) > self.max_ticks:
self.ticks = self.ticks[-self.max_ticks:]
# Update candles for all timeframes
for timeframe in self.timeframes:
if timeframe == "1s":
self._update_1s_candle(tick_obj)
else:
self._update_candles_for_timeframe(timeframe, tick_obj)
# Cache to disk periodically
self._try_cache_ticks()
def _update_1s_candle(self, tick):
"""Update the 1-second candle with the new tick"""
# Get timestamp for the start of the current second
tick_timestamp = tick['timestamp']
candle_timestamp = pd.Timestamp(int(tick_timestamp.timestamp() // 1 * 1_000_000_000))
# Check if we need to create a new candle
if self.current_candle["1s"] is None or self.current_candle["1s"]["timestamp"] != candle_timestamp:
# If we have a current candle, finalize it and add to candles list
if self.current_candle["1s"] is not None:
# Add the completed candle to the list
self.candles["1s"].append(self.current_candle["1s"])
# Limit the number of stored candles to prevent memory issues
if len(self.candles["1s"]) > 3600: # Keep last hour of 1s candles
self.candles["1s"] = self.candles["1s"][-3600:]
# Store in TimescaleDB if enabled
if self.use_timescaledb:
timescaledb_handler.upsert_candle(
self.symbol, "1s", self.current_candle["1s"]
)
# Log completed candle for debugging
logger.debug(f"Completed 1s candle: {self.current_candle['1s']['timestamp']} - Close: {self.current_candle['1s']['close']}")
# Create a new candle
self.current_candle["1s"] = {
"timestamp": candle_timestamp,
"open": float(tick["price"]),
"high": float(tick["price"]),
"low": float(tick["price"]),
"close": float(tick["price"]),
"volume": float(tick["quantity"]) if "quantity" in tick else 0.0
}
# Update last candle timestamp
self.last_candle_timestamp["1s"] = candle_timestamp
logger.debug(f"Created new 1s candle at {candle_timestamp}")
else:
# Update the current candle
current = self.current_candle["1s"]
price = float(tick["price"])
# Update high and low
if price > current["high"]:
current["high"] = price
if price < current["low"]:
current["low"] = price
# Update close price and add volume
current["close"] = price
current["volume"] += float(tick["quantity"]) if "quantity" in tick else 0.0
def _update_candles_for_timeframe(self, timeframe, tick):
"""Update candles for a specific timeframe"""
# Skip 1s as it's handled separately
if timeframe == "1s":
return
# Convert timeframe to seconds
timeframe_seconds = self._timeframe_to_seconds(timeframe)
# Get the timestamp truncated to the timeframe interval
# e.g., for a 5m candle, the timestamp should be truncated to the nearest 5-minute mark
# Convert timestamp to datetime if it's not already
tick_timestamp = tick['timestamp']
if isinstance(tick_timestamp, pd.Timestamp):
ts = tick_timestamp
else:
ts = pd.Timestamp(tick_timestamp)
# Truncate timestamp to nearest timeframe interval
timestamp = pd.Timestamp(
int(ts.timestamp() // timeframe_seconds * timeframe_seconds * 1_000_000_000)
)
# Get the current candle for this timeframe
current_candle = self.current_candle[timeframe]
# If we have no current candle or the timestamp is different (new candle)
if current_candle is None or current_candle['timestamp'] != timestamp:
# If we have a current candle, add it to the candles list
if current_candle:
self.candles[timeframe].append(current_candle)
# Save to TimescaleDB if enabled
if self.use_timescaledb:
timescaledb_handler.upsert_candle(self.symbol, timeframe, current_candle)
# Create a new candle
current_candle = {
'timestamp': timestamp,
'open': tick['price'],
'high': tick['price'],
'low': tick['price'],
'close': tick['price'],
'volume': tick.get('quantity', 0)
}
# Update current candle
self.current_candle[timeframe] = current_candle
self.last_candle_timestamp[timeframe] = timestamp
else:
# Update existing candle
current_candle['high'] = max(current_candle['high'], tick['price'])
current_candle['low'] = min(current_candle['low'], tick['price'])
current_candle['close'] = tick['price']
current_candle['volume'] += tick.get('quantity', 0)
# Limit the number of candles to avoid memory issues
max_candles = 1000
if len(self.candles[timeframe]) > max_candles:
self.candles[timeframe] = self.candles[timeframe][-max_candles:]
def _timeframe_to_seconds(self, timeframe):
"""Convert a timeframe string (e.g., '1m', '1h') to seconds"""
if timeframe == "1s":
return 1
try:
# Extract the number and unit
match = re.match(r'(\d+)([smhdw])', timeframe)
if not match:
return None
num, unit = match.groups()
num = int(num)
# Convert to seconds
if unit == 's':
return num
elif unit == 'm':
return num * 60
elif unit == 'h':
return num * 3600
elif unit == 'd':
return num * 86400
elif unit == 'w':
return num * 604800
return None
except:
return None
def get_candles(self, timeframe, limit=None):
"""Get candles for a given timeframe"""
if timeframe in self.candles:
candles = self.candles[timeframe]
# Add the current candle if it exists and isn't None
if timeframe in self.current_candle and self.current_candle[timeframe] is not None:
# Make a copy of the current candle
current_candle_copy = self.current_candle[timeframe].copy()
# Check if the current candle is newer than the last candle in the list
if not candles or current_candle_copy["timestamp"] > candles[-1]["timestamp"]:
candles = candles + [current_candle_copy]
# Apply limit if provided
if limit and len(candles) > limit:
return candles[-limit:]
return candles
return []
def get_last_price(self):
"""Get the last known price"""
if self.ticks:
return float(self.ticks[-1]["price"])
return None
def load_historical_data(self, symbol, limit=1000):
"""Load historical data for all timeframes"""
logger.info(f"Starting historical data load for {symbol} with limit {limit}")
# Clear existing data
self.ticks = []
self.candles = {tf: [] for tf in self.timeframes}
self.current_candle = {tf: None for tf in self.timeframes}
# Try to load ticks from cache first
logger.info("Attempting to load from cache...")
cache_loaded = self._load_from_cache()
if cache_loaded:
logger.info("Successfully loaded data from cache")
else:
logger.info("No valid cache data found")
# Check if we have TimescaleDB enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
logger.info("Attempting to fetch historical data from TimescaleDB")
loaded_from_db = False
# Load candles for each timeframe from TimescaleDB
for tf in self.timeframes:
try:
candles = timescaledb_handler.fetch_candles(symbol, tf, limit)
if candles:
self.candles[tf] = candles
loaded_from_db = True
logger.info(f"Loaded {len(candles)} {tf} candles from TimescaleDB")
else:
logger.info(f"No {tf} candles found in TimescaleDB")
except Exception as e:
logger.error(f"Error loading {tf} candles from TimescaleDB: {str(e)}")
if loaded_from_db:
logger.info("Successfully loaded historical data from TimescaleDB")
return True
else:
logger.info("TimescaleDB not available or disabled")
# If no TimescaleDB data and no cache, we need to get from Binance API
if not cache_loaded:
logger.info("Loading data from Binance API...")
# Create a BinanceHistoricalData instance
historical_data = BinanceHistoricalData()
# Load data for each timeframe
success_count = 0
for tf in self.timeframes:
if tf != "1s": # Skip 1s since we'll generate it from ticks
try:
logger.info(f"Fetching {tf} candles for {symbol}...")
df = historical_data.get_historical_candles(symbol, self._timeframe_to_seconds(tf), limit)
if df is not None and not df.empty:
logger.info(f"Loaded {len(df)} {tf} candles from Binance API")
# Convert to our candle format and store
candles = []
for _, row in df.iterrows():
candle = {
'timestamp': row['timestamp'],
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume']
}
candles.append(candle)
# Also save to TimescaleDB if enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
timescaledb_handler.upsert_candle(symbol, tf, candle)
self.candles[tf] = candles
success_count += 1
else:
logger.warning(f"No data returned for {tf} candles")
except Exception as e:
logger.error(f"Error loading {tf} candles: {str(e)}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"Successfully loaded {success_count} timeframes from Binance API")
# For 1s, load from API if possible or compute from first available timeframe
if "1s" in self.timeframes:
logger.info("Loading 1s candles...")
# Try to get 1s data from Binance
try:
df_1s = historical_data.get_historical_candles(symbol, 1, 300) # Only need recent 1s data
if df_1s is not None and not df_1s.empty:
logger.info(f"Loaded {len(df_1s)} recent 1s candles from Binance API")
# Convert to our candle format and store
candles_1s = []
for _, row in df_1s.iterrows():
candle = {
'timestamp': row['timestamp'],
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume']
}
candles_1s.append(candle)
# Also save to TimescaleDB if enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
timescaledb_handler.upsert_candle(symbol, "1s", candle)
self.candles["1s"] = candles_1s
except Exception as e:
logger.error(f"Error loading 1s candles: {str(e)}")
# If 1s data not available or failed to load, approximate from 1m data
if not self.candles.get("1s"):
logger.info("1s data not available, trying to approximate from 1m data...")
# If 1s data not available, we can approximate from 1m data
if "1m" in self.timeframes and self.candles["1m"]:
# For demonstration, just use the 1m candles as placeholders for 1s
# In a real implementation, you might want more sophisticated interpolation
logger.info("Using 1m candles as placeholders for 1s timeframe")
self.candles["1s"] = []
# Take the most recent 5 minutes of 1m candles
recent_1m = self.candles["1m"][-5:] if self.candles["1m"] else []
logger.info(f"Creating 1s approximations from {len(recent_1m)} 1m candles")
for candle_1m in recent_1m:
# Create 60 1s candles for each 1m candle
ts_base = candle_1m["timestamp"].timestamp()
for i in range(60):
# Create a 1s candle with interpolated values
candle_1s = {
'timestamp': pd.Timestamp(int((ts_base + i) * 1_000_000_000)),
'open': candle_1m['open'],
'high': candle_1m['high'],
'low': candle_1m['low'],
'close': candle_1m['close'],
'volume': candle_1m['volume'] / 60.0 # Distribute volume evenly
}
self.candles["1s"].append(candle_1s)
# Also save to TimescaleDB if enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
timescaledb_handler.upsert_candle(symbol, "1s", candle_1s)
logger.info(f"Created {len(self.candles['1s'])} approximated 1s candles")
else:
logger.warning("No 1m data available to approximate 1s candles from")
# Set the last candle of each timeframe as the current candle
for tf in self.timeframes:
if self.candles[tf]:
self.current_candle[tf] = self.candles[tf][-1].copy()
self.last_candle_timestamp[tf] = self.current_candle[tf]["timestamp"]
logger.debug(f"Set current candle for {tf}: {self.current_candle[tf]['timestamp']}")
# If we loaded ticks from cache, rebuild candles
if cache_loaded:
logger.info("Rebuilding candles from cached ticks...")
# Clear candles
self.candles = {tf: [] for tf in self.timeframes}
self.current_candle = {tf: None for tf in self.timeframes}
# Process each tick to rebuild the candles
for tick in self.ticks:
for tf in self.timeframes:
if tf == "1s":
self._update_1s_candle(tick)
else:
self._update_candles_for_timeframe(tf, tick)
logger.info("Finished rebuilding candles from ticks")
# Log final results
for tf in self.timeframes:
count = len(self.candles[tf])
logger.info(f"Final {tf} candle count: {count}")
has_data = cache_loaded or any(self.candles[tf] for tf in self.timeframes)
logger.info(f"Historical data loading completed. Has data: {has_data}")
return has_data
def _try_cache_ticks(self):
"""Try to save ticks to cache periodically"""
# Only save to cache every 100 ticks to avoid excessive disk I/O
if len(self.ticks) % 100 == 0:
try:
self._save_to_cache()
except Exception as e:
# Don't spam logs with cache errors, just log once every 1000 ticks
if len(self.ticks) % 1000 == 0:
logger.warning(f"Cache save failed at {len(self.ticks)} ticks: {str(e)}")
pass # Continue even if cache fails
class Position:
"""Represents a trading position"""
def __init__(self, action, entry_price, amount, timestamp=None, trade_id=None, fee_rate=0.0002):
self.action = action
self.entry_price = entry_price
self.amount = amount
self.entry_timestamp = timestamp or datetime.now()
self.exit_timestamp = None
self.exit_price = None
self.pnl = None
self.is_open = True
self.trade_id = trade_id or str(uuid.uuid4())[:8]
self.fee_rate = fee_rate
self.paid_fee = entry_price * amount * fee_rate # Calculate entry fee
def close(self, exit_price, exit_timestamp=None):
"""Close an open position"""
self.exit_price = exit_price
self.exit_timestamp = exit_timestamp or datetime.now()
self.is_open = False
# Calculate P&L
if self.action == "BUY":
price_diff = self.exit_price - self.entry_price
# Calculate fee for exit trade
exit_fee = exit_price * self.amount * self.fee_rate
self.paid_fee += exit_fee # Add exit fee to total paid fee
self.pnl = (price_diff * self.amount) - self.paid_fee
else: # SELL
price_diff = self.entry_price - self.exit_price
# Calculate fee for exit trade
exit_fee = exit_price * self.amount * self.fee_rate
self.paid_fee += exit_fee # Add exit fee to total paid fee
self.pnl = (price_diff * self.amount) - self.paid_fee
return self.pnl
class RealTimeChart:
def __init__(self, app=None, symbol='BTCUSDT', timeframe='1m', standalone=True, chart_title=None,
run_signal_interpreter=False, debug_mode=False, historical_candles=None,
extended_hours=False, enable_logging=True, agent=None, trading_env=None,
max_memory_usage=90, memory_check_interval=10, tick_update_interval=0.5,
chart_update_interval=1, performance_monitoring=False, show_volume=True,
show_indicators=True, custom_trades=None, port=8050, height=900, width=1200,
positions_callback=None, allow_synthetic_data=True, tick_storage=None):
"""Initialize a real-time chart with support for multiple indicators and backtesting."""
# Store parameters
self.symbol = symbol
self.timeframe = timeframe
self.debug_mode = debug_mode
self.standalone = standalone
self.chart_title = chart_title or f"{symbol} Real-Time Chart"
self.extended_hours = extended_hours
self.enable_logging = enable_logging
self.run_signal_interpreter = run_signal_interpreter
self.historical_candles = historical_candles
self.performance_monitoring = performance_monitoring
self.max_memory_usage = max_memory_usage
self.memory_check_interval = memory_check_interval
self.tick_update_interval = tick_update_interval
self.chart_update_interval = chart_update_interval
self.show_volume = show_volume
self.show_indicators = show_indicators
self.custom_trades = custom_trades
self.port = port
self.height = height
self.width = width
self.positions_callback = positions_callback
self.allow_synthetic_data = allow_synthetic_data
# Initialize interval store
self.interval_store = {'interval': 1} # Default to 1s timeframe
# Initialize trading components
self.agent = agent
self.trading_env = trading_env
# Initialize button styles for timeframe selection
self.button_style = {
'background': '#343a40',
'color': 'white',
'border': 'none',
'padding': '10px 20px',
'margin': '0 5px',
'borderRadius': '4px',
'cursor': 'pointer'
}
self.active_button_style = {
'background': '#007bff',
'color': 'white',
'border': 'none',
'padding': '10px 20px',
'margin': '0 5px',
'borderRadius': '4px',
'cursor': 'pointer',
'fontWeight': 'bold'
}
# Initialize color schemes
self.colors = {
'background': '#1e1e1e',
'text': '#ffffff',
'grid': '#333333',
'candle_up': '#26a69a',
'candle_down': '#ef5350',
'volume_up': 'rgba(38, 166, 154, 0.3)',
'volume_down': 'rgba(239, 83, 80, 0.3)',
'ma': '#ffeb3b',
'ema': '#29b6f6',
'bollinger_bands': '#ff9800',
'trades_buy': '#00e676',
'trades_sell': '#ff1744'
}
# Initialize data storage
self.all_trades = [] # Store trades
self.positions = [] # Store open positions
self.latest_price = 0.0
self.latest_volume = 0.0
self.latest_timestamp = datetime.now()
self.current_balance = 100.0 # Starting balance
self.accumulative_pnl = 0.0 # Accumulated profit/loss
# Initialize trade rate counter
self.trade_count = 0
self.start_time = time.time()
self.trades_per_second = 0
self.trades_per_minute = 0
self.trades_per_hour = 0
# Initialize trade rate tracking variables
self.trade_times = [] # Store timestamps of recent trades for rate calculation
self.last_trade_rate_calculation = datetime.now()
self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0}
# Initialize interactive components
self.app = app
# Create a new app if not provided
if self.app is None and standalone:
self.app = dash.Dash(
__name__,
external_stylesheets=[dbc.themes.DARKLY],
suppress_callback_exceptions=True
)
# Initialize tick storage if not provided
if tick_storage is None:
# Check if TimescaleDB integration is enabled
use_timescaledb = TIMESCALEDB_ENABLED and timescaledb_handler is not None
# Create a new tick storage
self.tick_storage = TickStorage(
symbol=symbol,
timeframes=["1s", "1m", "5m", "15m", "1h", "4h", "1d"],
use_timescaledb=use_timescaledb
)
# Load historical data immediately for cold start
logger.info(f"Loading historical data for {symbol} during chart initialization")
try:
data_loaded = self.tick_storage.load_historical_data(symbol)
if data_loaded:
logger.info(f"Successfully loaded historical data for {symbol}")
# Log what we have
for tf in ["1s", "1m", "5m", "15m", "1h"]:
candle_count = len(self.tick_storage.candles.get(tf, []))
logger.info(f" {tf}: {candle_count} candles")
else:
logger.warning(f"Failed to load historical data for {symbol}")
except Exception as e:
logger.error(f"Error loading historical data during initialization: {str(e)}")
import traceback
logger.error(traceback.format_exc())
else:
self.tick_storage = tick_storage
# Create layout and callbacks if app is provided
if self.app is not None:
# Create the layout
self.app.layout = self._create_layout()
# Register callbacks
self._setup_callbacks()
# Log initialization
if self.enable_logging:
logger.info(f"RealTimeChart initialized: {self.symbol} ({self.timeframe}) ")
def _create_layout(self):
return html.Div([
# Header section with title and current price
html.Div([
html.H1(f"{self.symbol} Real-Time Chart", className="display-4"),
# Current price ticker
html.Div([
html.H4("Current Price:", style={"display": "inline-block", "marginRight": "10px"}),
html.H3(id="current-price", style={"display": "inline-block", "color": "#17a2b8"}),
html.Div([
html.H5("Balance:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.H5(id="current-balance", style={"display": "inline-block", "color": "#28a745"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
html.Div([
html.H5("Accumulated PnL:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.H5(id="accumulated-pnl", style={"display": "inline-block", "color": "#ffc107"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
# Add trade rate display
html.Div([
html.H5("Trade Rate:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.Span([
html.Span(id="trade-rate-second", style={"color": "#ff7f0e"}),
html.Span("/s, "),
html.Span(id="trade-rate-minute", style={"color": "#ff7f0e"}),
html.Span("/m, "),
html.Span(id="trade-rate-hour", style={"color": "#ff7f0e"}),
html.Span("/h")
], style={"display": "inline-block"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
], style={"textAlign": "center", "margin": "20px 0"}),
], style={"textAlign": "center", "marginBottom": "20px"}),
# Add interval component for periodic updates
dcc.Interval(
id='interval-component',
interval=500, # in milliseconds
n_intervals=0
),
# Add timeframe selection buttons
html.Div([
html.Button('1s', id='btn-1s', n_clicks=0, style=self.active_button_style),
html.Button('5s', id='btn-5s', n_clicks=0, style=self.button_style),
html.Button('15s', id='btn-15s', n_clicks=0, style=self.button_style),
html.Button('1m', id='btn-1m', n_clicks=0, style=self.button_style),
html.Button('5m', id='btn-5m', n_clicks=0, style=self.button_style),
html.Button('15m', id='btn-15m', n_clicks=0, style=self.button_style),
html.Button('1h', id='btn-1h', n_clicks=0, style=self.button_style),
], style={"textAlign": "center", "marginBottom": "20px"}),
# Store for the selected timeframe
dcc.Store(id='interval-store', data={'interval': 1}),
# Chart content (without wrapper div to avoid callback issues)
dcc.Graph(id='live-chart', style={"height": "600px"}),
dcc.Graph(id='secondary-charts', style={"height": "500px"}),
html.Div(id='positions-list')
])
def _create_chart_and_controls(self):
"""Create the chart and controls for the dashboard."""
try:
# Get selected interval from the dashboard (default to 1s if not available)
interval_seconds = 1
if hasattr(self, 'interval_store') and self.interval_store:
interval_seconds = self.interval_store.get('interval', 1)
# Create chart components
chart_div = html.Div([
# Update chart with data for the selected interval
dcc.Graph(
id='live-chart',
figure=self._update_main_chart(interval_seconds),
style={"height": "600px"}
),
# Update secondary charts
dcc.Graph(
id='secondary-charts',
figure=self._update_secondary_charts(),
style={"height": "500px"}
),
# Update positions list
html.Div(
id='positions-list',
children=self._get_position_list_rows()
)
])
return chart_div
except Exception as e:
logger.error(f"Error creating chart and controls: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return a simple error message as fallback
return html.Div(f"Error loading chart: {str(e)}", style={"color": "red", "padding": "20px"})
def _setup_callbacks(self):
"""Setup Dash callbacks for the real-time chart"""
if self.app is None:
return
try:
# Update chart with all components based on interval
@self.app.callback(
[
Output('live-chart', 'figure'),
Output('secondary-charts', 'figure'),
Output('positions-list', 'children'),
Output('current-price', 'children'),
Output('current-balance', 'children'),
Output('accumulated-pnl', 'children'),
Output('trade-rate-second', 'children'),
Output('trade-rate-minute', 'children'),
Output('trade-rate-hour', 'children')
],
[
Input('interval-component', 'n_intervals'),
Input('interval-store', 'data')
]
)
def update_all(n_intervals, interval_data):
"""Update all chart components"""
try:
# Get selected interval
interval_seconds = interval_data.get('interval', 1) if interval_data else 1
# Update main chart - limit data for performance
main_chart = self._update_main_chart(interval_seconds)
# Update secondary charts - limit data for performance
secondary_charts = self._update_secondary_charts()
# Update positions list
positions_list = self._get_position_list_rows()
# Update current price and balance
current_price = f"${self.latest_price:.2f}" if self.latest_price else "Error"
current_balance = f"${self.current_balance:.2f}"
accumulated_pnl = f"${self.accumulative_pnl:.2f}"
# Calculate trade rates
trade_rate = self._calculate_trade_rate()
trade_rate_second = f"{trade_rate['per_second']:.1f}"
trade_rate_minute = f"{trade_rate['per_minute']:.1f}"
trade_rate_hour = f"{trade_rate['per_hour']:.1f}"
return (main_chart, secondary_charts, positions_list,
current_price, current_balance, accumulated_pnl,
trade_rate_second, trade_rate_minute, trade_rate_hour)
except Exception as e:
logger.error(f"Error in update_all callback: {str(e)}")
# Return empty/error states
import plotly.graph_objects as go
empty_fig = go.Figure()
empty_fig.add_annotation(text="Chart Loading...", xref="paper", yref="paper", x=0.5, y=0.5)
return (empty_fig, empty_fig, [], "Loading...", "$0.00", "$0.00", "0.0", "0.0", "0.0")
# Timeframe selection callbacks
@self.app.callback(
[Output('interval-store', 'data'),
Output('btn-1s', 'style'), Output('btn-5s', 'style'), Output('btn-15s', 'style'),
Output('btn-1m', 'style'), Output('btn-5m', 'style'), Output('btn-15m', 'style'),
Output('btn-1h', 'style')],
[Input('btn-1s', 'n_clicks'), Input('btn-5s', 'n_clicks'), Input('btn-15s', 'n_clicks'),
Input('btn-1m', 'n_clicks'), Input('btn-5m', 'n_clicks'), Input('btn-15m', 'n_clicks'),
Input('btn-1h', 'n_clicks')]
)
def update_timeframe(n1s, n5s, n15s, n1m, n5m, n15m, n1h):
"""Update selected timeframe based on button clicks"""
ctx = dash.callback_context
if not ctx.triggered:
# Default to 1s
styles = [self.active_button_style] + [self.button_style] * 6
return {'interval': 1}, *styles
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
# Map button to interval seconds
interval_map = {
'btn-1s': 1, 'btn-5s': 5, 'btn-15s': 15,
'btn-1m': 60, 'btn-5m': 300, 'btn-15m': 900, 'btn-1h': 3600
}
selected_interval = interval_map.get(button_id, 1)
# Create styles - active for selected, normal for others
button_names = ['btn-1s', 'btn-5s', 'btn-15s', 'btn-1m', 'btn-5m', 'btn-15m', 'btn-1h']
styles = []
for name in button_names:
if name == button_id:
styles.append(self.active_button_style)
else:
styles.append(self.button_style)
return {'interval': selected_interval}, *styles
logger.info("Dash callbacks registered successfully")
except Exception as e:
logger.error(f"Error setting up callbacks: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def _calculate_trade_rate(self):
"""Calculate trading rate per second, minute, and hour"""
try:
now = datetime.now()
current_time = time.time()
# Filter trades within different time windows
trades_last_second = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 1)
trades_last_minute = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 60)
trades_last_hour = sum(1 for trade_time in self.trade_times if current_time - trade_time <= 3600)
return {
"per_second": trades_last_second,
"per_minute": trades_last_minute,
"per_hour": trades_last_hour
}
except Exception as e:
logger.warning(f"Error calculating trade rate: {str(e)}")
return {"per_second": 0.0, "per_minute": 0.0, "per_hour": 0.0}
def _update_secondary_charts(self):
"""Create secondary charts for volume and indicators"""
try:
# Create subplots for secondary charts
fig = make_subplots(
rows=2, cols=1,
subplot_titles=['Volume', 'Technical Indicators'],
shared_xaxes=True,
vertical_spacing=0.1,
row_heights=[0.3, 0.7]
)
# Get latest candles (limit for performance)
candles = self.tick_storage.candles.get("1m", [])[-100:] # Last 100 candles for performance
if not candles:
fig.add_annotation(text="No data available", xref="paper", yref="paper", x=0.5, y=0.5)
fig.update_layout(
title="Secondary Charts",
template="plotly_dark",
height=400
)
return fig
# Extract data
timestamps = [candle['timestamp'] for candle in candles]
volumes = [candle['volume'] for candle in candles]
closes = [candle['close'] for candle in candles]
# Volume chart
colors = ['#26a69a' if i == 0 or closes[i] >= closes[i-1] else '#ef5350' for i in range(len(closes))]
fig.add_trace(
go.Bar(
x=timestamps,
y=volumes,
name='Volume',
marker_color=colors,
showlegend=False
),
row=1, col=1
)
# Technical indicators
if len(closes) >= 20:
# Simple moving average
sma_20 = pd.Series(closes).rolling(window=20).mean()
fig.add_trace(
go.Scatter(
x=timestamps,
y=sma_20,
name='SMA 20',
line=dict(color='#ffeb3b', width=2)
),
row=2, col=1
)
# RSI calculation
if len(closes) >= 14:
rsi = self._calculate_rsi(closes, 14)
fig.add_trace(
go.Scatter(
x=timestamps,
y=rsi,
name='RSI',
line=dict(color='#29b6f6', width=2),
yaxis='y3'
),
row=2, col=1
)
# Update layout
fig.update_layout(
title="Volume & Technical Indicators",
template="plotly_dark",
height=400,
showlegend=True,
legend=dict(x=0, y=1, bgcolor='rgba(0,0,0,0)')
)
# Update y-axes
fig.update_yaxes(title="Volume", row=1, col=1)
fig.update_yaxes(title="Price", row=2, col=1)
return fig
except Exception as e:
logger.error(f"Error creating secondary charts: {str(e)}")
# Return empty figure on error
fig = go.Figure()
fig.add_annotation(text=f"Error: {str(e)}", xref="paper", yref="paper", x=0.5, y=0.5)
fig.update_layout(template="plotly_dark", height=400)
return fig
def _calculate_rsi(self, prices, period=14):
"""Calculate RSI indicator"""
try:
prices = pd.Series(prices)
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi.fillna(50).tolist() # Fill NaN with neutral RSI value
except Exception:
return [50] * len(prices) # Return neutral RSI on error
def _get_position_list_rows(self):
"""Get list of current positions for display"""
try:
if not self.positions:
return [html.Div("No open positions", style={"color": "#888", "padding": "10px"})]
rows = []
for i, position in enumerate(self.positions):
try:
# Calculate current PnL
current_pnl = (self.latest_price - position.entry_price) * position.amount
if position.action.upper() == 'SELL':
current_pnl = -current_pnl
# Create position row
row = html.Div([
html.Span(f"#{i+1}: ", style={"fontWeight": "bold"}),
html.Span(f"{position.action.upper()} ",
style={"color": "#00e676" if position.action.upper() == "BUY" else "#ff1744"}),
html.Span(f"{position.amount:.4f} @ ${position.entry_price:.2f} "),
html.Span(f"PnL: ${current_pnl:.2f}",
style={"color": "#00e676" if current_pnl >= 0 else "#ff1744"})
], style={"padding": "5px", "borderBottom": "1px solid #333"})
rows.append(row)
except Exception as e:
logger.warning(f"Error formatting position {i}: {str(e)}")
return rows
except Exception as e:
logger.error(f"Error getting position list: {str(e)}")
return [html.Div("Error loading positions", style={"color": "red", "padding": "10px"})]
def add_trade(self, action, price, amount, timestamp=None, trade_id=None):
"""Add a trade to the chart and update tracking"""
try:
if timestamp is None:
timestamp = datetime.now()
# Create trade record
trade = {
'id': trade_id or str(uuid.uuid4()),
'action': action.upper(),
'price': float(price),
'amount': float(amount),
'timestamp': timestamp,
'value': float(price) * float(amount)
}
# Add to trades list
self.all_trades.append(trade)
# Update trade rate tracking
self.trade_times.append(time.time())
# Keep only last hour of trade times
cutoff_time = time.time() - 3600
self.trade_times = [t for t in self.trade_times if t > cutoff_time]
# Update positions
if action.upper() in ['BUY', 'SELL']:
position = Position(
action=action.upper(),
entry_price=float(price),
amount=float(amount),
timestamp=timestamp,
trade_id=trade['id']
)
self.positions.append(position)
# Update balance and PnL
if action.upper() == 'BUY':
self.current_balance -= trade['value']
else: # SELL
self.current_balance += trade['value']
# Calculate PnL for this trade
if len(self.all_trades) > 1:
# Simple PnL calculation - more sophisticated logic could be added
last_opposite_trades = [t for t in reversed(self.all_trades[:-1])
if t['action'] != action.upper()]
if last_opposite_trades:
last_trade = last_opposite_trades[0]
if action.upper() == 'SELL':
pnl = (float(price) - last_trade['price']) * float(amount)
else: # BUY
pnl = (last_trade['price'] - float(price)) * float(amount)
self.accumulative_pnl += pnl
logger.info(f"Added trade: {action.upper()} {amount} @ ${price:.2f}")
except Exception as e:
logger.error(f"Error adding trade: {str(e)}")
def _get_interval_key(self, interval_seconds):
"""Convert interval seconds to timeframe key"""
if interval_seconds <= 1:
return "1s"
elif interval_seconds <= 5:
return "5s" if "5s" in self.tick_storage.timeframes else "1s"
elif interval_seconds <= 15:
return "15s" if "15s" in self.tick_storage.timeframes else "1m"
elif interval_seconds <= 60:
return "1m"
elif interval_seconds <= 300:
return "5m"
elif interval_seconds <= 900:
return "15m"
elif interval_seconds <= 3600:
return "1h"
elif interval_seconds <= 14400:
return "4h"
else:
return "1d"
def _update_main_chart(self, interval_seconds):
"""Update the main chart for the specified interval"""
try:
# Convert interval seconds to timeframe key
interval_key = self._get_interval_key(interval_seconds)
# Get candles for this timeframe (limit to last 100 for performance)
candles = self.tick_storage.candles.get(interval_key, [])[-100:]
if not candles:
logger.warning(f"No candle data available for {interval_key}")
# Return empty figure with a message
fig = go.Figure()
fig.add_annotation(
text=f"No data available for {interval_key}",
xref="paper", yref="paper",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=16, color="white")
)
fig.update_layout(
title=f"{self.symbol} - {interval_key} Chart",
template="plotly_dark",
height=600
)
return fig
# Extract data from candles
timestamps = [candle['timestamp'] for candle in candles]
opens = [candle['open'] for candle in candles]
highs = [candle['high'] for candle in candles]
lows = [candle['low'] for candle in candles]
closes = [candle['close'] for candle in candles]
volumes = [candle['volume'] for candle in candles]
# Create candlestick chart
fig = go.Figure()
# Add candlestick trace
fig.add_trace(go.Candlestick(
x=timestamps,
open=opens,
high=highs,
low=lows,
close=closes,
name="Price",
increasing_line_color='#26a69a',
decreasing_line_color='#ef5350',
increasing_fillcolor='#26a69a',
decreasing_fillcolor='#ef5350'
))
# Add trade markers if we have trades
if self.all_trades:
# Filter trades to match the current timeframe window
start_time = timestamps[0] if timestamps else datetime.now() - timedelta(hours=1)
end_time = timestamps[-1] if timestamps else datetime.now()
filtered_trades = [
trade for trade in self.all_trades
if start_time <= trade['timestamp'] <= end_time
]
if filtered_trades:
buy_trades = [t for t in filtered_trades if t['action'] == 'BUY']
sell_trades = [t for t in filtered_trades if t['action'] == 'SELL']
# Add BUY markers
if buy_trades:
fig.add_trace(go.Scatter(
x=[t['timestamp'] for t in buy_trades],
y=[t['price'] for t in buy_trades],
mode='markers',
marker=dict(
symbol='triangle-up',
size=12,
color='#00e676',
line=dict(color='white', width=1)
),
name='BUY',
text=[f"BUY {t['amount']:.4f} @ ${t['price']:.2f}" for t in buy_trades],
hovertemplate='%{text}
Time: %{x}'
))
# Add SELL markers
if sell_trades:
fig.add_trace(go.Scatter(
x=[t['timestamp'] for t in sell_trades],
y=[t['price'] for t in sell_trades],
mode='markers',
marker=dict(
symbol='triangle-down',
size=12,
color='#ff1744',
line=dict(color='white', width=1)
),
name='SELL',
text=[f"SELL {t['amount']:.4f} @ ${t['price']:.2f}" for t in sell_trades],
hovertemplate='%{text}
Time: %{x}'
))
# Add moving averages if we have enough data
if len(closes) >= 20:
# 20-period SMA
sma_20 = pd.Series(closes).rolling(window=20).mean()
fig.add_trace(go.Scatter(
x=timestamps,
y=sma_20,
name='SMA 20',
line=dict(color='#ffeb3b', width=1),
opacity=0.7
))
if len(closes) >= 50:
# 50-period SMA
sma_50 = pd.Series(closes).rolling(window=50).mean()
fig.add_trace(go.Scatter(
x=timestamps,
y=sma_50,
name='SMA 50',
line=dict(color='#ff9800', width=1),
opacity=0.7
))
# Update layout
fig.update_layout(
title=f"{self.symbol} - {interval_key} Chart ({len(candles)} candles)",
template="plotly_dark",
height=600,
xaxis_title="Time",
yaxis_title="Price ($)",
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01,
bgcolor="rgba(0,0,0,0.5)"
),
hovermode='x unified',
dragmode='pan'
)
# Remove range slider for better performance
fig.update_layout(xaxis_rangeslider_visible=False)
# Update the latest price
if closes:
self.latest_price = closes[-1]
self.latest_timestamp = timestamps[-1]
return fig
except Exception as e:
logger.error(f"Error updating main chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return error figure
fig = go.Figure()
fig.add_annotation(
text=f"Chart Error: {str(e)}",
xref="paper", yref="paper",
x=0.5, y=0.5,
showarrow=False,
font=dict(size=16, color="red")
)
fig.update_layout(
title="Chart Error",
template="plotly_dark",
height=600
)
return fig
def set_trading_env(self, trading_env):
"""Set the trading environment to monitor for new trades"""
self.trading_env = trading_env
if hasattr(trading_env, 'add_trade_callback'):
trading_env.add_trade_callback(self.add_trade)
logger.info("Trading environment integrated with chart")
def set_agent(self, agent):
"""Set the agent to monitor for trading decisions"""
self.agent = agent
logger.info("Agent integrated with chart")
def update_from_env(self, env_data):
"""Update chart data from trading environment"""
try:
if 'latest_price' in env_data:
self.latest_price = env_data['latest_price']
if 'balance' in env_data:
self.current_balance = env_data['balance']
if 'pnl' in env_data:
self.accumulative_pnl = env_data['pnl']
if 'trades' in env_data:
# Add any new trades
for trade in env_data['trades']:
if trade not in self.all_trades:
self.add_trade(
action=trade.get('action', 'HOLD'),
price=trade.get('price', self.latest_price),
amount=trade.get('amount', 0.1),
timestamp=trade.get('timestamp', datetime.now()),
trade_id=trade.get('id')
)
except Exception as e:
logger.error(f"Error updating from environment: {str(e)}")
def get_latest_data(self):
"""Get the latest data for external systems"""
return {
'latest_price': self.latest_price,
'latest_volume': self.latest_volume,
'latest_timestamp': self.latest_timestamp,
'current_balance': self.current_balance,
'accumulative_pnl': self.accumulative_pnl,
'positions': len(self.positions),
'trade_count': len(self.all_trades),
'trade_rate': self._calculate_trade_rate()
}
async def start_websocket(self):
"""Start the websocket connection for real-time data"""
try:
logger.info("Starting websocket connection for real-time data")
# Start the websocket data fetching
websocket_url = "wss://stream.binance.com:9443/ws/ethusdt@ticker"
async def websocket_handler():
"""Handle websocket connection and data updates"""
try:
async with websockets.connect(websocket_url) as websocket:
logger.info(f"WebSocket connected for {self.symbol}")
message_count = 0
async for message in websocket:
try:
data = json.loads(message)
# Update tick storage with new price data
tick = {
'price': float(data['c']), # Current price
'volume': float(data['v']), # Volume
'timestamp': pd.Timestamp.now()
}
self.tick_storage.add_tick(tick)
# Update chart's latest price and volume
self.latest_price = float(data['c'])
self.latest_volume = float(data['v'])
self.latest_timestamp = pd.Timestamp.now()
message_count += 1
# Log periodic updates
if message_count % 100 == 0:
logger.info(f"Received message #{message_count}")
logger.info(f"Processed {message_count} ticks, current price: ${self.latest_price:.2f}")
# Log candle counts
candle_count = len(self.tick_storage.candles.get("1s", []))
logger.info(f"Current 1s candles count: {candle_count}")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse websocket message: {str(e)}")
except Exception as e:
logger.error(f"Error processing websocket message: {str(e)}")
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
except Exception as e:
logger.error(f"WebSocket error: {str(e)}")
# Start the websocket handler in the background
await websocket_handler()
except Exception as e:
logger.error(f"Error starting websocket: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def run(self, host='127.0.0.1', port=8050, debug=False):
"""Run the Dash app"""
try:
if self.app is None:
logger.error("No Dash app instance available")
return
logger.info("="*60)
logger.info("🔗 ACCESS WEB UI AT: http://localhost:8050/")
logger.info("📊 View live trading data and charts in your browser")
logger.info("="*60)
# Run the app
self.app.run_server(
host=host,
port=port,
debug=debug,
use_reloader=False, # Disable reloader to avoid conflicts
threaded=True # Enable threading for better performance
)
except Exception as e:
logger.error(f"Error running Dash app: {str(e)}")
import traceback
logger.error(traceback.format_exc())