import asyncio
import json
import logging
from typing import Dict, List, Optional, Tuple, Union
import websockets
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import dash
from dash import html, dcc
from dash.dependencies import Input, Output
import pandas as pd
import numpy as np
from collections import deque
import time
from threading import Thread
import requests
import os
from datetime import datetime, timedelta
import pytz
import tzlocal
import threading
import random
import dash_bootstrap_components as dbc
import uuid
import ta
from sklearn.preprocessing import MinMaxScaler
import re
import psutil
import gc
import websocket
# Import psycopg2 with error handling
try:
import psycopg2
PSYCOPG2_AVAILABLE = True
except ImportError:
PSYCOPG2_AVAILABLE = False
psycopg2 = None
# TimescaleDB configuration from environment variables
TIMESCALEDB_ENABLED = os.environ.get('TIMESCALEDB_ENABLED', '1') == '1' and PSYCOPG2_AVAILABLE
TIMESCALEDB_HOST = os.environ.get('TIMESCALEDB_HOST', '192.168.0.10')
TIMESCALEDB_PORT = int(os.environ.get('TIMESCALEDB_PORT', '5432'))
TIMESCALEDB_USER = os.environ.get('TIMESCALEDB_USER', 'postgres')
TIMESCALEDB_PASSWORD = os.environ.get('TIMESCALEDB_PASSWORD', 'timescaledbpass')
TIMESCALEDB_DB = os.environ.get('TIMESCALEDB_DB', 'candles')
class TimescaleDBHandler:
"""Handler for TimescaleDB operations for candle storage and retrieval"""
def __init__(self):
"""Initialize TimescaleDB connection if enabled"""
self.enabled = TIMESCALEDB_ENABLED
self.conn = None
if not self.enabled:
if not PSYCOPG2_AVAILABLE:
print("psycopg2 module not available. TimescaleDB integration disabled.")
return
try:
# Connect to TimescaleDB
self.conn = psycopg2.connect(
host=TIMESCALEDB_HOST,
port=TIMESCALEDB_PORT,
user=TIMESCALEDB_USER,
password=TIMESCALEDB_PASSWORD,
dbname=TIMESCALEDB_DB
)
print(f"Connected to TimescaleDB at {TIMESCALEDB_HOST}:{TIMESCALEDB_PORT}")
# Ensure the candles table exists
self._ensure_table()
print("TimescaleDB integration initialized successfully")
except Exception as e:
print(f"Error connecting to TimescaleDB: {str(e)}")
self.enabled = False
self.conn = None
def _ensure_table(self):
"""Ensure the candles table exists with TimescaleDB hypertable"""
if not self.conn:
return
try:
with self.conn.cursor() as cur:
# Create the candles table if it doesn't exist
cur.execute('''
CREATE TABLE IF NOT EXISTS candles (
symbol TEXT,
interval TEXT,
timestamp TIMESTAMPTZ,
open DOUBLE PRECISION,
high DOUBLE PRECISION,
low DOUBLE PRECISION,
close DOUBLE PRECISION,
volume DOUBLE PRECISION,
PRIMARY KEY (symbol, interval, timestamp)
);
''')
# Check if the table is already a hypertable
cur.execute('''
SELECT EXISTS (
SELECT 1 FROM timescaledb_information.hypertables
WHERE hypertable_name = 'candles'
);
''')
is_hypertable = cur.fetchone()[0]
# Convert to hypertable if not already done
if not is_hypertable:
cur.execute('''
SELECT create_hypertable('candles', 'timestamp',
if_not_exists => TRUE,
migrate_data => TRUE
);
''')
self.conn.commit()
print("TimescaleDB table structure verified")
except Exception as e:
print(f"Error setting up TimescaleDB tables: {str(e)}")
self.enabled = False
def upsert_candle(self, symbol, interval, candle):
"""Insert or update a candle in TimescaleDB"""
if not self.enabled or not self.conn:
return False
try:
with self.conn.cursor() as cur:
cur.execute('''
INSERT INTO candles (
symbol, interval, timestamp,
open, high, low, close, volume
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (symbol, interval, timestamp)
DO UPDATE SET
open = EXCLUDED.open,
high = EXCLUDED.high,
low = EXCLUDED.low,
close = EXCLUDED.close,
volume = EXCLUDED.volume
''', (
symbol, interval, candle['timestamp'],
candle['open'], candle['high'], candle['low'],
candle['close'], candle['volume']
))
self.conn.commit()
return True
except Exception as e:
print(f"Error upserting candle to TimescaleDB: {str(e)}")
# Try to reconnect on error
try:
self.conn = psycopg2.connect(
host=TIMESCALEDB_HOST,
port=TIMESCALEDB_PORT,
user=TIMESCALEDB_USER,
password=TIMESCALEDB_PASSWORD,
dbname=TIMESCALEDB_DB
)
except:
pass
return False
def fetch_candles(self, symbol, interval, limit=1000):
"""Fetch candles from TimescaleDB"""
if not self.enabled or not self.conn:
return []
try:
with self.conn.cursor() as cur:
cur.execute('''
SELECT timestamp, open, high, low, close, volume
FROM candles
WHERE symbol = %s AND interval = %s
ORDER BY timestamp DESC
LIMIT %s
''', (symbol, interval, limit))
rows = cur.fetchall()
# Convert to list of dictionaries (ordered from oldest to newest)
candles = []
for row in reversed(rows): # Reverse to get oldest first
candle = {
'timestamp': row[0],
'open': row[1],
'high': row[2],
'low': row[3],
'close': row[4],
'volume': row[5]
}
candles.append(candle)
return candles
except Exception as e:
print(f"Error fetching candles from TimescaleDB: {str(e)}")
# Try to reconnect on error
try:
self.conn = psycopg2.connect(
host=TIMESCALEDB_HOST,
port=TIMESCALEDB_PORT,
user=TIMESCALEDB_USER,
password=TIMESCALEDB_PASSWORD,
dbname=TIMESCALEDB_DB
)
except:
pass
return []
class BinanceHistoricalData:
"""
Class for fetching historical price data from Binance.
"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3"
self.cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache')
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
# Timestamp of last data update
self.last_update = None
def get_historical_candles(self, symbol, interval_seconds=3600, limit=1000):
"""
Fetch historical candles from Binance API.
Args:
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
interval_seconds (int): Timeframe in seconds (e.g., 3600 for 1h)
limit (int): Number of candles to fetch
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
# Convert interval_seconds to Binance interval format
interval_map = {
1: "1s",
60: "1m",
300: "5m",
900: "15m",
1800: "30m",
3600: "1h",
14400: "4h",
86400: "1d"
}
interval = interval_map.get(interval_seconds, "1h")
# Format symbol for Binance API (remove slash and make uppercase)
formatted_symbol = symbol.replace("/", "").upper()
# Check if we have cached data first
cache_file = self._get_cache_filename(formatted_symbol, interval)
cached_data = self._load_from_cache(formatted_symbol, interval)
# If we have cached data that's recent enough, use it
if cached_data is not None and len(cached_data) >= limit:
cache_age_minutes = (datetime.now() - self.last_update).total_seconds() / 60 if self.last_update else 60
if cache_age_minutes < 15: # Only use cache if it's less than 15 minutes old
logger.info(f"Using cached historical data for {symbol} ({interval})")
return cached_data
try:
# Build URL for klines endpoint
url = f"{self.base_url}/klines"
params = {
"symbol": formatted_symbol,
"interval": interval,
"limit": limit
}
# Make the request
response = requests.get(url, params=params)
response.raise_for_status()
# Parse the response
data = response.json()
# Create dataframe
df = pd.DataFrame(data, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert timestamp to datetime
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
# Convert price columns to float
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Sort by timestamp
df = df.sort_values("timestamp")
# Save to cache for future use
self._save_to_cache(df, formatted_symbol, interval)
self.last_update = datetime.now()
logger.info(f"Fetched {len(df)} candles for {symbol} ({interval})")
return df
except Exception as e:
logger.error(f"Error fetching historical data from Binance: {str(e)}")
# Return cached data if we have it, even if it's not enough
if cached_data is not None:
logger.warning(f"Using cached data instead (may be incomplete)")
return cached_data
# Return empty dataframe on error
return pd.DataFrame()
def _get_cache_filename(self, symbol, interval):
"""Get filename for cache file"""
return os.path.join(self.cache_dir, f"{symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol, interval):
"""Load candles from cache file"""
try:
cache_file = self._get_cache_filename(symbol, interval)
if os.path.exists(cache_file):
# For 1s interval, check if the cache is recent (less than 10 minutes old)
if interval == "1s" or interval == 1:
file_mod_time = datetime.fromtimestamp(os.path.getmtime(cache_file))
time_diff = (datetime.now() - file_mod_time).total_seconds() / 60
if time_diff > 10:
logger.info("1s cache is older than 10 minutes, skipping load")
return None
logger.info(f"Using recent 1s cache (age: {time_diff:.1f} minutes)")
df = pd.read_csv(cache_file)
df["timestamp"] = pd.to_datetime(df["timestamp"])
logger.info(f"Loaded {len(df)} candles from cache: {cache_file}")
return df
except Exception as e:
logger.error(f"Error loading cached data: {str(e)}")
return None
def _save_to_cache(self, df, symbol, interval):
"""Save candles to cache file"""
try:
cache_file = self._get_cache_filename(symbol, interval)
df.to_csv(cache_file, index=False)
logger.info(f"Saved {len(df)} candles to cache: {cache_file}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_recent_trades(self, symbol, limit=1000):
"""Get recent trades for a symbol"""
formatted_symbol = symbol.replace("/", "")
try:
url = f"{self.base_url}/trades"
params = {
"symbol": formatted_symbol,
"limit": limit
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Create dataframe
df = pd.DataFrame(data)
df["time"] = pd.to_datetime(df["time"], unit="ms")
df["price"] = df["price"].astype(float)
df["qty"] = df["qty"].astype(float)
return df
except Exception as e:
logger.error(f"Error fetching recent trades: {str(e)}")
return pd.DataFrame()
class MultiTimeframeDataInterface:
"""
Enhanced Data Interface supporting:
- Multiple trading pairs
- Multiple timeframes per pair (1s, 1m, 1h, 1d + custom)
- Technical indicators
- Cross-timeframe normalization
- Real-time data updates
"""
def __init__(self, symbol=None, timeframes=None, data_dir="data"):
"""
Initialize the data interface.
Args:
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
timeframes (list): List of timeframes to use (e.g., ['1m', '5m', '1h', '4h', '1d'])
data_dir (str): Directory to store/load datasets
"""
self.symbol = symbol
self.timeframes = timeframes or ['1h', '4h', '1d']
self.data_dir = data_dir
self.scalers = {} # Store scalers for each timeframe
# Initialize the historical data fetcher
self.historical_data = BinanceHistoricalData()
# Create data directory if it doesn't exist
os.makedirs(self.data_dir, exist_ok=True)
# Initialize empty dataframes for each timeframe
self.dataframes = {tf: None for tf in self.timeframes}
# Store timestamps of last updates per timeframe
self.last_updates = {tf: None for tf in self.timeframes}
# Timeframe mapping (string to seconds)
self.timeframe_to_seconds = {
'1s': 1,
'1m': 60,
'5m': 300,
'15m': 900,
'30m': 1800,
'1h': 3600,
'4h': 14400,
'1d': 86400
}
logger.info(f"MultiTimeframeDataInterface initialized for {symbol} with timeframes {timeframes}")
def get_data(self, timeframe='1h', n_candles=1000, refresh=False, add_indicators=True):
"""
Fetch historical price data for a given timeframe with optional indicators.
Args:
timeframe (str): Timeframe to fetch data for
n_candles (int): Number of candles to fetch
refresh (bool): Force refresh of the data
add_indicators (bool): Whether to add technical indicators
Returns:
pd.DataFrame: DataFrame with OHLCV data and indicators
"""
# Check if we need to refresh
current_time = datetime.now()
if (not refresh and
self.dataframes[timeframe] is not None and
self.last_updates[timeframe] is not None and
(current_time - self.last_updates[timeframe]).total_seconds() < 60):
logger.info(f"Using cached data for {self.symbol} {timeframe}")
return self.dataframes[timeframe]
interval_seconds = self.timeframe_to_seconds.get(timeframe, 3600)
# Fetch data
df = self.historical_data.get_historical_candles(
symbol=self.symbol,
interval_seconds=interval_seconds,
limit=n_candles
)
if df is None or df.empty:
logger.error(f"No data available for {self.symbol} {timeframe}")
return None
# Add indicators if requested
if add_indicators:
df = self.add_indicators(df)
# Store in cache
self.dataframes[timeframe] = df
self.last_updates[timeframe] = current_time
logger.info(f"Fetched and processed {len(df)} candles for {self.symbol} {timeframe}")
return df
def add_indicators(self, df):
"""
Add comprehensive technical indicators to the dataframe.
Args:
df (pd.DataFrame): DataFrame with OHLCV data
Returns:
pd.DataFrame: DataFrame with added technical indicators
"""
# Make a copy to avoid modifying the original
df_copy = df.copy()
# Basic price indicators
df_copy['returns'] = df_copy['close'].pct_change()
df_copy['log_returns'] = np.log(df_copy['close'] / df_copy['close'].shift(1))
# Moving Averages
df_copy['sma_7'] = ta.trend.sma_indicator(df_copy['close'], window=7)
df_copy['sma_25'] = ta.trend.sma_indicator(df_copy['close'], window=25)
df_copy['sma_99'] = ta.trend.sma_indicator(df_copy['close'], window=99)
df_copy['ema_9'] = ta.trend.ema_indicator(df_copy['close'], window=9)
df_copy['ema_21'] = ta.trend.ema_indicator(df_copy['close'], window=21)
# MACD
macd = ta.trend.MACD(df_copy['close'])
df_copy['macd'] = macd.macd()
df_copy['macd_signal'] = macd.macd_signal()
df_copy['macd_diff'] = macd.macd_diff()
# RSI
df_copy['rsi'] = ta.momentum.rsi(df_copy['close'], window=14)
# Bollinger Bands
bollinger = ta.volatility.BollingerBands(df_copy['close'])
df_copy['bb_high'] = bollinger.bollinger_hband()
df_copy['bb_low'] = bollinger.bollinger_lband()
df_copy['bb_pct'] = bollinger.bollinger_pband()
# Stochastic Oscillator
stoch = ta.momentum.StochasticOscillator(df_copy['high'], df_copy['low'], df_copy['close'])
df_copy['stoch_k'] = stoch.stoch()
df_copy['stoch_d'] = stoch.stoch_signal()
# ATR - Average True Range
df_copy['atr'] = ta.volatility.average_true_range(df_copy['high'], df_copy['low'], df_copy['close'], window=14)
# Money Flow Index
df_copy['mfi'] = ta.volume.money_flow_index(df_copy['high'], df_copy['low'], df_copy['close'], df_copy['volume'], window=14)
# OBV - On-Balance Volume
df_copy['obv'] = ta.volume.on_balance_volume(df_copy['close'], df_copy['volume'])
# Ichimoku Cloud
ichimoku = ta.trend.IchimokuIndicator(df_copy['high'], df_copy['low'])
df_copy['ichimoku_a'] = ichimoku.ichimoku_a()
df_copy['ichimoku_b'] = ichimoku.ichimoku_b()
df_copy['ichimoku_base'] = ichimoku.ichimoku_base_line()
df_copy['ichimoku_conv'] = ichimoku.ichimoku_conversion_line()
# ADX - Average Directional Index
adx = ta.trend.ADXIndicator(df_copy['high'], df_copy['low'], df_copy['close'])
df_copy['adx'] = adx.adx()
df_copy['adx_pos'] = adx.adx_pos()
df_copy['adx_neg'] = adx.adx_neg()
# VWAP - Volume Weighted Average Price (intraday)
# Custom calculation since TA library doesn't include VWAP
df_copy['vwap'] = (df_copy['volume'] * (df_copy['high'] + df_copy['low'] + df_copy['close']) / 3).cumsum() / df_copy['volume'].cumsum()
# Fill NaN values
df_copy = df_copy.fillna(method='bfill').fillna(0)
return df_copy
def get_multi_timeframe_data(self, timeframes=None, n_candles=1000, refresh=False, add_indicators=True):
"""
Fetch data for multiple timeframes.
Args:
timeframes (list): List of timeframes to fetch
n_candles (int): Number of candles to fetch for each timeframe
refresh (bool): Force refresh of the data
add_indicators (bool): Whether to add technical indicators
Returns:
dict: Dictionary of dataframes indexed by timeframe
"""
if timeframes is None:
timeframes = self.timeframes
result = {}
for tf in timeframes:
# For higher timeframes, we need fewer candles
tf_candles = n_candles
if tf == '4h':
tf_candles = max(250, n_candles // 4)
elif tf == '1d':
tf_candles = max(100, n_candles // 24)
df = self.get_data(timeframe=tf, n_candles=tf_candles, refresh=refresh, add_indicators=add_indicators)
if df is not None and not df.empty:
result[tf] = df
return result
def prepare_training_data(self, window_size=20, train_ratio=0.8, refresh=False):
"""
Prepare training data from multiple timeframes.
Args:
window_size (int): Size of the sliding window
train_ratio (float): Ratio of data to use for training
refresh (bool): Whether to refresh the data
Returns:
tuple: (X_train, y_train, X_val, y_val, train_prices, val_prices)
"""
# Get data for all timeframes
data_dict = self.get_multi_timeframe_data(refresh=refresh)
if not data_dict:
logger.error("Failed to fetch data for any timeframe")
return None, None, None, None, None, None
# Align all dataframes by timestamp
all_dfs = list(data_dict.values())
min_date = max([df['timestamp'].min() for df in all_dfs])
max_date = min([df['timestamp'].max() for df in all_dfs])
aligned_dfs = {}
for tf, df in data_dict.items():
aligned_df = df[(df['timestamp'] >= min_date) & (df['timestamp'] <= max_date)]
aligned_dfs[tf] = aligned_df
# Choose the lowest timeframe as the reference for time alignment
reference_tf = min(self.timeframes, key=lambda x: self.timeframe_to_seconds.get(x, 3600))
reference_df = aligned_dfs[reference_tf]
# Create sliding windows for each timeframe
X_dict = {}
for tf, df in aligned_dfs.items():
# Drop timestamp and create numeric features
features = df.drop('timestamp', axis=1).values
# Ensure the feature array is 3D: [samples, window, features]
X = np.array([features[i:i+window_size] for i in range(len(features)-window_size)])
X_dict[tf] = X
# Create target labels based on future price movements
reference_prices = reference_df['close'].values
future_prices = reference_prices[window_size:]
current_prices = reference_prices[window_size-1:-1]
# Calculate returns
returns = (future_prices - current_prices) / current_prices
# Create labels: 0=SELL, 1=HOLD, 2=BUY
threshold = 0.0005 # 0.05% threshold
y = np.zeros(len(returns), dtype=int)
y[returns > threshold] = 2 # BUY
y[returns < -threshold] = 0 # SELL
y[(returns >= -threshold) & (returns <= threshold)] = 1 # HOLD
# Split into training and validation sets
split_idx = int(len(y) * train_ratio)
X_train_dict = {tf: X[:split_idx] for tf, X in X_dict.items()}
X_val_dict = {tf: X[split_idx:] for tf, X in X_dict.items()}
y_train = y[:split_idx]
y_val = y[split_idx:]
train_prices = reference_prices[window_size-1:window_size-1+split_idx]
val_prices = reference_prices[window_size-1+split_idx:window_size-1+len(y)]
logger.info(f"Prepared training data - Train: {len(y_train)}, Val: {len(y_val)}")
return X_train_dict, y_train, X_val_dict, y_val, train_prices, val_prices
def normalize_data(self, data_dict, fit=True):
"""
Normalize data across all timeframes.
Args:
data_dict (dict): Dictionary of data arrays by timeframe
fit (bool): Whether to fit new scalers or use existing ones
Returns:
dict: Dictionary of normalized data arrays
"""
result = {}
for tf, data in data_dict.items():
# For 3D data [samples, window, features]
if len(data.shape) == 3:
samples, window, features = data.shape
reshaped = data.reshape(-1, features)
if fit or tf not in self.scalers:
self.scalers[tf] = MinMaxScaler()
normalized = self.scalers[tf].fit_transform(reshaped)
else:
normalized = self.scalers[tf].transform(reshaped)
result[tf] = normalized.reshape(samples, window, features)
# For 2D data [samples, features]
elif len(data.shape) == 2:
if fit or tf not in self.scalers:
self.scalers[tf] = MinMaxScaler()
result[tf] = self.scalers[tf].fit_transform(data)
else:
result[tf] = self.scalers[tf].transform(data)
return result
def get_realtime_features(self, timeframes=None, window_size=20):
"""
Get the most recent data for real-time prediction.
Args:
timeframes (list): List of timeframes to use
window_size (int): Size of the sliding window
Returns:
dict: Dictionary of feature arrays for the latest window
"""
if timeframes is None:
timeframes = self.timeframes
# Get fresh data
data_dict = self.get_multi_timeframe_data(timeframes=timeframes, refresh=True)
result = {}
for tf, df in data_dict.items():
if len(df) < window_size:
logger.warning(f"Not enough data for {tf} (need {window_size}, got {len(df)})")
continue
# Get the latest window
latest_data = df.tail(window_size).drop('timestamp', axis=1).values
# Add extra dimension to match model input shape [1, window_size, features]
result[tf] = latest_data.reshape(1, window_size, -1)
# Apply normalization using existing scalers
if self.scalers:
result = self.normalize_data(result, fit=False)
return result
def calculate_pnl(self, predictions, prices, position_size=1.0, fee_rate=0.0002):
"""
Calculate PnL and win rate from predictions.
Args:
predictions (np.ndarray): Array of predicted actions (0=SELL, 1=HOLD, 2=BUY)
prices (np.ndarray): Array of prices
position_size (float): Size of each position
fee_rate (float): Trading fee rate (default: 0.0002 for 0.02% per trade)
Returns:
tuple: (total_pnl, win_rate, trades)
"""
if len(predictions) < 2 or len(prices) < 2:
return 0.0, 0.0, []
# Ensure arrays are the same length
min_len = min(len(predictions), len(prices)-1)
actions = predictions[:min_len]
pnl = 0.0
wins = 0
trades = []
for i in range(min_len):
current_price = prices[i]
next_price = prices[i+1]
action = actions[i]
# Skip HOLD actions
if action == 1:
continue
price_change = (next_price - current_price) / current_price
if action == 2: # BUY
# Calculate raw PnL
raw_pnl = price_change * position_size
# Calculate fees (entry and exit)
entry_fee = position_size * fee_rate
exit_fee = position_size * (1 + price_change) * fee_rate
total_fees = entry_fee + exit_fee
# Net PnL after fees
trade_pnl = raw_pnl - total_fees
trade_type = 'BUY'
is_win = trade_pnl > 0
elif action == 0: # SELL
# Calculate raw PnL
raw_pnl = -price_change * position_size
# Calculate fees (entry and exit)
entry_fee = position_size * fee_rate
exit_fee = position_size * (1 - price_change) * fee_rate
total_fees = entry_fee + exit_fee
# Net PnL after fees
trade_pnl = raw_pnl - total_fees
trade_type = 'SELL'
is_win = trade_pnl > 0
else:
continue
pnl += trade_pnl
wins += int(is_win)
trades.append({
'type': trade_type,
'entry': float(current_price), # Ensure serializable
'exit': float(next_price),
'raw_pnl': float(raw_pnl),
'fees': float(total_fees),
'pnl': float(trade_pnl),
'win': bool(is_win),
'timestamp': datetime.now().isoformat() # Add timestamp
})
win_rate = wins / len(trades) if trades else 0.0
return float(pnl), float(win_rate), trades
# Configure logging with more detailed format
logging.basicConfig(
level=logging.INFO, # Changed to DEBUG for more detailed logs
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('realtime_chart.log')
]
)
logger = logging.getLogger(__name__)
# Neural Network integration (conditional import)
NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1'
nn_orchestrator = None
nn_inference_thread = None
if NN_ENABLED:
try:
import sys
# Add project root to sys.path if needed
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
from NN.main import NeuralNetworkOrchestrator
logger.info("Neural Network module enabled")
except ImportError as e:
logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}")
NN_ENABLED = False
# NN utility functions
def setup_neural_network():
"""Initialize the neural network components if enabled"""
global nn_orchestrator, NN_ENABLED
if not NN_ENABLED:
return False
try:
# Get configuration from environment variables or use defaults
symbol = os.environ.get('NN_SYMBOL', 'ETH/USDT')
timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',')
output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL
# Configure the orchestrator
config = {
'symbol': symbol,
'timeframes': timeframes,
'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')),
'n_features': 5, # OHLCV
'output_size': output_size,
'model_dir': 'NN/models/saved',
'data_dir': 'NN/data'
}
# Initialize the orchestrator
logger.info(f"Initializing Neural Network Orchestrator with config: {config}")
nn_orchestrator = NeuralNetworkOrchestrator(config)
# Load the model
model_loaded = nn_orchestrator.load_model()
if not model_loaded:
logger.warning("Failed to load neural network model. Using untrained model.")
return model_loaded
except Exception as e:
logger.error(f"Error setting up neural network: {str(e)}")
NN_ENABLED = False
return False
def start_nn_inference_thread(interval_seconds):
"""Start a background thread to periodically run inference with the neural network"""
global nn_inference_thread
if not NN_ENABLED or nn_orchestrator is None:
logger.warning("Cannot start inference thread - Neural Network not enabled or initialized")
return False
def inference_worker():
"""Worker function for the inference thread"""
model_type = os.environ.get('NN_MODEL_TYPE', 'cnn')
timeframe = os.environ.get('NN_TIMEFRAME', '1h')
logger.info(f"Starting neural network inference thread with {interval_seconds}s interval")
logger.info(f"Using model type: {model_type}, timeframe: {timeframe}")
# Wait a bit for charts to initialize
time.sleep(5)
# Track active charts
active_charts = []
while True:
try:
# Find active charts if we don't have them yet
if not active_charts and 'charts' in globals():
active_charts = globals()['charts']
logger.info(f"Found {len(active_charts)} active charts for NN signals")
# Run inference
result = nn_orchestrator.run_inference_pipeline(
model_type=model_type,
timeframe=timeframe
)
if result:
# Log the result
logger.info(f"Neural network inference result: {result}")
# Add signal to charts
if active_charts:
try:
if 'action' in result:
action = result['action']
timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00'))
# Get probability if available
probability = None
if 'probability' in result:
probability = result['probability']
elif 'probabilities' in result:
probability = result['probabilities'].get(action, None)
# Add signal to each chart
for chart in active_charts:
if hasattr(chart, 'add_nn_signal'):
chart.add_nn_signal(action, timestamp, probability)
except Exception as e:
logger.error(f"Error adding NN signal to chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Sleep for the interval
time.sleep(interval_seconds)
except Exception as e:
logger.error(f"Error in inference thread: {str(e)}")
import traceback
logger.error(traceback.format_exc())
time.sleep(5) # Wait a bit before retrying
# Create and start the thread
nn_inference_thread = threading.Thread(target=inference_worker, daemon=True)
nn_inference_thread.start()
return True
# Try to get local timezone, default to Sofia/EET if not available
try:
local_timezone = tzlocal.get_localzone()
# Get timezone name safely
try:
tz_name = str(local_timezone)
# Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone
if hasattr(local_timezone, 'zone'):
tz_name = local_timezone.zone
elif hasattr(local_timezone, 'key'):
tz_name = local_timezone.key
else:
tz_name = str(local_timezone)
except:
tz_name = "Local"
logger.info(f"Detected local timezone: {local_timezone} ({tz_name})")
except Exception as e:
logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET")
local_timezone = pytz.timezone('Europe/Sofia')
tz_name = "Europe/Sofia"
def convert_to_local_time(timestamp):
"""Convert timestamp to local timezone"""
try:
if isinstance(timestamp, pd.Timestamp):
dt = timestamp.to_pydatetime()
elif isinstance(timestamp, np.datetime64):
dt = pd.Timestamp(timestamp).to_pydatetime()
elif isinstance(timestamp, str):
dt = pd.to_datetime(timestamp).to_pydatetime()
else:
dt = timestamp
# If datetime is naive (no timezone), assume it's UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=pytz.UTC)
# Convert to local timezone
local_dt = dt.astimezone(local_timezone)
return local_dt
except Exception as e:
logger.error(f"Error converting timestamp to local time: {str(e)}")
return timestamp
# Initialize TimescaleDB handler - only once, at module level
timescaledb_handler = TimescaleDBHandler() if TIMESCALEDB_ENABLED else None
class TickStorage:
def __init__(self, symbol, timeframes=None, use_timescaledb=False):
"""Initialize the tick storage for a specific symbol"""
self.symbol = symbol
self.timeframes = timeframes or ["1s", "5m", "15m", "1h", "4h", "1d"]
self.ticks = []
self.candles = {tf: [] for tf in self.timeframes}
self.current_candle = {tf: None for tf in self.timeframes}
self.last_candle_timestamp = {tf: None for tf in self.timeframes}
self.cache_dir = os.path.join(os.getcwd(), "cache", symbol.replace("/", ""))
self.cache_path = os.path.join(self.cache_dir, f"{symbol.replace('/', '')}_ticks.json") # Add missing cache_path
self.use_timescaledb = use_timescaledb
self.max_ticks = 10000 # Maximum number of ticks to store in memory
# Create cache directory if it doesn't exist
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Creating new tick storage for {symbol} with timeframes {self.timeframes}")
logger.info(f"Cache directory: {self.cache_dir}")
logger.info(f"Cache file: {self.cache_path}")
if use_timescaledb:
print(f"TickStorage: TimescaleDB integration is ENABLED for {symbol}")
else:
logger.info(f"TickStorage: TimescaleDB integration is DISABLED for {symbol}")
def _save_to_cache(self):
"""Save ticks to a cache file"""
try:
# Only save the latest 5000 ticks to avoid giant files
ticks_to_save = self.ticks[-5000:] if len(self.ticks) > 5000 else self.ticks
with open(self.cache_path, 'w') as f:
json.dump(ticks_to_save, f)
except Exception as e:
logger.error(f"Error saving ticks to cache: {e}")
def _load_from_cache(self):
"""Load ticks from cache if available"""
if os.path.exists(self.cache_path):
try:
# Check if the cache file is recent (< 10 minutes old)
cache_age = time.time() - os.path.getmtime(self.cache_path)
if cache_age > 600: # 10 minutes in seconds
logger.warning(f"Cache file is {cache_age:.1f} seconds old (>10 min). Not using it.")
return False
with open(self.cache_path, 'r') as f:
cached_ticks = json.load(f)
if cached_ticks:
self.ticks = cached_ticks
logger.info(f"Loaded {len(cached_ticks)} ticks from cache")
return True
except Exception as e:
logger.error(f"Error loading ticks from cache: {e}")
return False
def add_tick(self, tick=None, price=None, volume=None, timestamp=None):
"""
Add a tick to the storage and update candles for all timeframes
Args:
tick (dict, optional): A tick object containing price, quantity and timestamp
price (float, optional): Price of the tick (used in older interface)
volume (float, optional): Volume of the tick (used in older interface)
timestamp (datetime, optional): Timestamp of the tick (used in older interface)
"""
# Handle tick as a dict or separate parameters for backward compatibility
if tick is not None and isinstance(tick, dict):
# Using the new interface with a tick object
price = tick['price']
volume = tick.get('quantity', 0)
timestamp = tick['timestamp']
elif price is not None:
# Using the old interface with separate parameters
# Convert datetime to pd.Timestamp if needed
if timestamp is not None and not isinstance(timestamp, pd.Timestamp):
timestamp = pd.Timestamp(timestamp)
else:
logger.error("Invalid tick: must provide either a tick dict or price")
return
# Create tick object
tick_obj = {
'price': price,
'quantity': volume if volume is not None else 0,
'timestamp': timestamp
}
# Add to the list of ticks
self.ticks.append(tick_obj)
# Limit the number of ticks to avoid memory issues
if len(self.ticks) > self.max_ticks:
self.ticks = self.ticks[-self.max_ticks:]
# Update candles for all timeframes
for timeframe in self.timeframes:
if timeframe == "1s":
self._update_1s_candle(tick_obj)
else:
self._update_candles_for_timeframe(timeframe, tick_obj)
# Cache to disk periodically
self._try_cache_ticks()
def _update_1s_candle(self, tick):
"""Update the 1-second candle with the new tick"""
# Get timestamp for the start of the current second
tick_timestamp = tick['timestamp']
candle_timestamp = pd.Timestamp(int(tick_timestamp.timestamp() // 1 * 1_000_000_000))
# Check if we need to create a new candle
if self.current_candle["1s"] is None or self.current_candle["1s"]["timestamp"] != candle_timestamp:
# If we have a current candle, finalize it and add to candles list
if self.current_candle["1s"] is not None:
# Add the completed candle to the list
self.candles["1s"].append(self.current_candle["1s"])
# Limit the number of stored candles to prevent memory issues
if len(self.candles["1s"]) > 3600: # Keep last hour of 1s candles
self.candles["1s"] = self.candles["1s"][-3600:]
# Store in TimescaleDB if enabled
if self.use_timescaledb:
timescaledb_handler.upsert_candle(
self.symbol, "1s", self.current_candle["1s"]
)
# Log completed candle for debugging
logger.debug(f"Completed 1s candle: {self.current_candle['1s']['timestamp']} - Close: {self.current_candle['1s']['close']}")
# Create a new candle
self.current_candle["1s"] = {
"timestamp": candle_timestamp,
"open": float(tick["price"]),
"high": float(tick["price"]),
"low": float(tick["price"]),
"close": float(tick["price"]),
"volume": float(tick["quantity"]) if "quantity" in tick else 0.0
}
# Update last candle timestamp
self.last_candle_timestamp["1s"] = candle_timestamp
logger.debug(f"Created new 1s candle at {candle_timestamp}")
else:
# Update the current candle
current = self.current_candle["1s"]
price = float(tick["price"])
# Update high and low
if price > current["high"]:
current["high"] = price
if price < current["low"]:
current["low"] = price
# Update close price and add volume
current["close"] = price
current["volume"] += float(tick["quantity"]) if "quantity" in tick else 0.0
def _update_candles_for_timeframe(self, timeframe, tick):
"""Update candles for a specific timeframe"""
# Skip 1s as it's handled separately
if timeframe == "1s":
return
# Convert timeframe to seconds
timeframe_seconds = self._timeframe_to_seconds(timeframe)
# Get the timestamp truncated to the timeframe interval
# e.g., for a 5m candle, the timestamp should be truncated to the nearest 5-minute mark
# Convert timestamp to datetime if it's not already
tick_timestamp = tick['timestamp']
if isinstance(tick_timestamp, pd.Timestamp):
ts = tick_timestamp
else:
ts = pd.Timestamp(tick_timestamp)
# Truncate timestamp to nearest timeframe interval
timestamp = pd.Timestamp(
int(ts.timestamp() // timeframe_seconds * timeframe_seconds * 1_000_000_000)
)
# Get the current candle for this timeframe
current_candle = self.current_candle[timeframe]
# If we have no current candle or the timestamp is different (new candle)
if current_candle is None or current_candle['timestamp'] != timestamp:
# If we have a current candle, add it to the candles list
if current_candle:
self.candles[timeframe].append(current_candle)
# Save to TimescaleDB if enabled
if self.use_timescaledb:
timescaledb_handler.upsert_candle(self.symbol, timeframe, current_candle)
# Create a new candle
current_candle = {
'timestamp': timestamp,
'open': tick['price'],
'high': tick['price'],
'low': tick['price'],
'close': tick['price'],
'volume': tick.get('quantity', 0)
}
# Update current candle
self.current_candle[timeframe] = current_candle
self.last_candle_timestamp[timeframe] = timestamp
else:
# Update existing candle
current_candle['high'] = max(current_candle['high'], tick['price'])
current_candle['low'] = min(current_candle['low'], tick['price'])
current_candle['close'] = tick['price']
current_candle['volume'] += tick.get('quantity', 0)
# Limit the number of candles to avoid memory issues
max_candles = 1000
if len(self.candles[timeframe]) > max_candles:
self.candles[timeframe] = self.candles[timeframe][-max_candles:]
def _timeframe_to_seconds(self, timeframe):
"""Convert a timeframe string (e.g., '1m', '1h') to seconds"""
if timeframe == "1s":
return 1
try:
# Extract the number and unit
match = re.match(r'(\d+)([smhdw])', timeframe)
if not match:
return None
num, unit = match.groups()
num = int(num)
# Convert to seconds
if unit == 's':
return num
elif unit == 'm':
return num * 60
elif unit == 'h':
return num * 3600
elif unit == 'd':
return num * 86400
elif unit == 'w':
return num * 604800
return None
except:
return None
def get_candles(self, timeframe, limit=None):
"""Get candles for a given timeframe"""
if timeframe in self.candles:
candles = self.candles[timeframe]
# Add the current candle if it exists and isn't None
if timeframe in self.current_candle and self.current_candle[timeframe] is not None:
# Make a copy of the current candle
current_candle_copy = self.current_candle[timeframe].copy()
# Check if the current candle is newer than the last candle in the list
if not candles or current_candle_copy["timestamp"] > candles[-1]["timestamp"]:
candles = candles + [current_candle_copy]
# Apply limit if provided
if limit and len(candles) > limit:
return candles[-limit:]
return candles
return []
def get_last_price(self):
"""Get the last known price"""
if self.ticks:
return float(self.ticks[-1]["price"])
return None
def load_historical_data(self, symbol, limit=1000):
"""Load historical data for all timeframes"""
logger.info(f"Starting historical data load for {symbol} with limit {limit}")
# Clear existing data
self.ticks = []
self.candles = {tf: [] for tf in self.timeframes}
self.current_candle = {tf: None for tf in self.timeframes}
# Try to load ticks from cache first
logger.info("Attempting to load from cache...")
cache_loaded = self._load_from_cache()
if cache_loaded:
logger.info("Successfully loaded data from cache")
else:
logger.info("No valid cache data found")
# Check if we have TimescaleDB enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
logger.info("Attempting to fetch historical data from TimescaleDB")
loaded_from_db = False
# Load candles for each timeframe from TimescaleDB
for tf in self.timeframes:
try:
candles = timescaledb_handler.fetch_candles(symbol, tf, limit)
if candles:
self.candles[tf] = candles
loaded_from_db = True
logger.info(f"Loaded {len(candles)} {tf} candles from TimescaleDB")
else:
logger.info(f"No {tf} candles found in TimescaleDB")
except Exception as e:
logger.error(f"Error loading {tf} candles from TimescaleDB: {str(e)}")
if loaded_from_db:
logger.info("Successfully loaded historical data from TimescaleDB")
return True
else:
logger.info("TimescaleDB not available or disabled")
# If no TimescaleDB data and no cache, we need to get from Binance API
if not cache_loaded:
logger.info("Loading data from Binance API...")
# Create a BinanceHistoricalData instance
historical_data = BinanceHistoricalData()
# Load data for each timeframe
success_count = 0
for tf in self.timeframes:
if tf != "1s": # Skip 1s since we'll generate it from ticks
try:
logger.info(f"Fetching {tf} candles for {symbol}...")
df = historical_data.get_historical_candles(symbol, self._timeframe_to_seconds(tf), limit)
if df is not None and not df.empty:
logger.info(f"Loaded {len(df)} {tf} candles from Binance API")
# Convert to our candle format and store
candles = []
for _, row in df.iterrows():
candle = {
'timestamp': row['timestamp'],
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume']
}
candles.append(candle)
# Also save to TimescaleDB if enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
timescaledb_handler.upsert_candle(symbol, tf, candle)
self.candles[tf] = candles
success_count += 1
else:
logger.warning(f"No data returned for {tf} candles")
except Exception as e:
logger.error(f"Error loading {tf} candles: {str(e)}")
import traceback
logger.error(traceback.format_exc())
logger.info(f"Successfully loaded {success_count} timeframes from Binance API")
# For 1s, load from API if possible or compute from first available timeframe
if "1s" in self.timeframes:
logger.info("Loading 1s candles...")
# Try to get 1s data from Binance
try:
df_1s = historical_data.get_historical_candles(symbol, 1, 300) # Only need recent 1s data
if df_1s is not None and not df_1s.empty:
logger.info(f"Loaded {len(df_1s)} recent 1s candles from Binance API")
# Convert to our candle format and store
candles_1s = []
for _, row in df_1s.iterrows():
candle = {
'timestamp': row['timestamp'],
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume']
}
candles_1s.append(candle)
# Also save to TimescaleDB if enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
timescaledb_handler.upsert_candle(symbol, "1s", candle)
self.candles["1s"] = candles_1s
except Exception as e:
logger.error(f"Error loading 1s candles: {str(e)}")
# If 1s data not available or failed to load, approximate from 1m data
if not self.candles.get("1s"):
logger.info("1s data not available, trying to approximate from 1m data...")
# If 1s data not available, we can approximate from 1m data
if "1m" in self.timeframes and self.candles["1m"]:
# For demonstration, just use the 1m candles as placeholders for 1s
# In a real implementation, you might want more sophisticated interpolation
logger.info("Using 1m candles as placeholders for 1s timeframe")
self.candles["1s"] = []
# Take the most recent 5 minutes of 1m candles
recent_1m = self.candles["1m"][-5:] if self.candles["1m"] else []
logger.info(f"Creating 1s approximations from {len(recent_1m)} 1m candles")
for candle_1m in recent_1m:
# Create 60 1s candles for each 1m candle
ts_base = candle_1m["timestamp"].timestamp()
for i in range(60):
# Create a 1s candle with interpolated values
candle_1s = {
'timestamp': pd.Timestamp(int((ts_base + i) * 1_000_000_000)),
'open': candle_1m['open'],
'high': candle_1m['high'],
'low': candle_1m['low'],
'close': candle_1m['close'],
'volume': candle_1m['volume'] / 60.0 # Distribute volume evenly
}
self.candles["1s"].append(candle_1s)
# Also save to TimescaleDB if enabled
if self.use_timescaledb and timescaledb_handler and timescaledb_handler.enabled:
timescaledb_handler.upsert_candle(symbol, "1s", candle_1s)
logger.info(f"Created {len(self.candles['1s'])} approximated 1s candles")
else:
logger.warning("No 1m data available to approximate 1s candles from")
# Set the last candle of each timeframe as the current candle
for tf in self.timeframes:
if self.candles[tf]:
self.current_candle[tf] = self.candles[tf][-1].copy()
self.last_candle_timestamp[tf] = self.current_candle[tf]["timestamp"]
logger.debug(f"Set current candle for {tf}: {self.current_candle[tf]['timestamp']}")
# If we loaded ticks from cache, rebuild candles
if cache_loaded:
logger.info("Rebuilding candles from cached ticks...")
# Clear candles
self.candles = {tf: [] for tf in self.timeframes}
self.current_candle = {tf: None for tf in self.timeframes}
# Process each tick to rebuild the candles
for tick in self.ticks:
for tf in self.timeframes:
if tf == "1s":
self._update_1s_candle(tick)
else:
self._update_candles_for_timeframe(tf, tick)
logger.info("Finished rebuilding candles from ticks")
# Log final results
for tf in self.timeframes:
count = len(self.candles[tf])
logger.info(f"Final {tf} candle count: {count}")
has_data = cache_loaded or any(self.candles[tf] for tf in self.timeframes)
logger.info(f"Historical data loading completed. Has data: {has_data}")
return has_data
def _try_cache_ticks(self):
"""Try to save ticks to cache periodically"""
# Only save to cache every 100 ticks to avoid excessive disk I/O
if len(self.ticks) % 100 == 0:
self._save_to_cache()
class Position:
"""Represents a trading position"""
def __init__(self, action, entry_price, amount, timestamp=None, trade_id=None, fee_rate=0.0002):
self.action = action
self.entry_price = entry_price
self.amount = amount
self.entry_timestamp = timestamp or datetime.now()
self.exit_timestamp = None
self.exit_price = None
self.pnl = None
self.is_open = True
self.trade_id = trade_id or str(uuid.uuid4())[:8]
self.fee_rate = fee_rate
self.paid_fee = entry_price * amount * fee_rate # Calculate entry fee
def close(self, exit_price, exit_timestamp=None):
"""Close an open position"""
self.exit_price = exit_price
self.exit_timestamp = exit_timestamp or datetime.now()
self.is_open = False
# Calculate P&L
if self.action == "BUY":
price_diff = self.exit_price - self.entry_price
# Calculate fee for exit trade
exit_fee = exit_price * self.amount * self.fee_rate
self.paid_fee += exit_fee # Add exit fee to total paid fee
self.pnl = (price_diff * self.amount) - self.paid_fee
else: # SELL
price_diff = self.entry_price - self.exit_price
# Calculate fee for exit trade
exit_fee = exit_price * self.amount * self.fee_rate
self.paid_fee += exit_fee # Add exit fee to total paid fee
self.pnl = (price_diff * self.amount) - self.paid_fee
return self.pnl
class RealTimeChart:
def __init__(self, app=None, symbol='BTCUSDT', timeframe='1m', standalone=True, chart_title=None,
run_signal_interpreter=False, debug_mode=False, historical_candles=None,
extended_hours=False, enable_logging=True, agent=None, trading_env=None,
max_memory_usage=90, memory_check_interval=10, tick_update_interval=0.5,
chart_update_interval=1, performance_monitoring=False, show_volume=True,
show_indicators=True, custom_trades=None, port=8050, height=900, width=1200,
positions_callback=None, allow_synthetic_data=True, tick_storage=None):
"""Initialize a real-time chart with support for multiple indicators and backtesting."""
# Store parameters
self.symbol = symbol
self.timeframe = timeframe
self.debug_mode = debug_mode
self.standalone = standalone
self.chart_title = chart_title or f"{symbol} Real-Time Chart"
self.extended_hours = extended_hours
self.enable_logging = enable_logging
self.run_signal_interpreter = run_signal_interpreter
self.historical_candles = historical_candles
self.performance_monitoring = performance_monitoring
self.max_memory_usage = max_memory_usage
self.memory_check_interval = memory_check_interval
self.tick_update_interval = tick_update_interval
self.chart_update_interval = chart_update_interval
self.show_volume = show_volume
self.show_indicators = show_indicators
self.custom_trades = custom_trades
self.port = port
self.height = height
self.width = width
self.positions_callback = positions_callback
self.allow_synthetic_data = allow_synthetic_data
# Initialize interval store
self.interval_store = {'interval': 1} # Default to 1s timeframe
# Initialize trading components
self.agent = agent
self.trading_env = trading_env
# Initialize button styles for timeframe selection
self.button_style = {
'background': '#343a40',
'color': 'white',
'border': 'none',
'padding': '10px 20px',
'margin': '0 5px',
'borderRadius': '4px',
'cursor': 'pointer'
}
self.active_button_style = {
'background': '#007bff',
'color': 'white',
'border': 'none',
'padding': '10px 20px',
'margin': '0 5px',
'borderRadius': '4px',
'cursor': 'pointer',
'fontWeight': 'bold'
}
# Initialize color schemes
self.colors = {
'background': '#1e1e1e',
'text': '#ffffff',
'grid': '#333333',
'candle_up': '#26a69a',
'candle_down': '#ef5350',
'volume_up': 'rgba(38, 166, 154, 0.3)',
'volume_down': 'rgba(239, 83, 80, 0.3)',
'ma': '#ffeb3b',
'ema': '#29b6f6',
'bollinger_bands': '#ff9800',
'trades_buy': '#00e676',
'trades_sell': '#ff1744'
}
# Initialize data storage
self.all_trades = [] # Store trades
self.positions = [] # Store open positions
self.latest_price = 0.0
self.latest_volume = 0.0
self.latest_timestamp = datetime.now()
self.current_balance = 100.0 # Starting balance
self.accumulative_pnl = 0.0 # Accumulated profit/loss
# Initialize trade rate counter
self.trade_count = 0
self.start_time = time.time()
self.trades_per_second = 0
self.trades_per_minute = 0
self.trades_per_hour = 0
# Initialize trade rate tracking variables
self.trade_times = [] # Store timestamps of recent trades for rate calculation
self.last_trade_rate_calculation = datetime.now()
self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0}
# Initialize interactive components
self.app = app
# Create a new app if not provided
if self.app is None and standalone:
self.app = dash.Dash(
__name__,
external_stylesheets=[dbc.themes.DARKLY],
suppress_callback_exceptions=True
)
# Initialize tick storage if not provided
if tick_storage is None:
# Check if TimescaleDB integration is enabled
use_timescaledb = TIMESCALEDB_ENABLED and timescaledb_handler is not None
# Create a new tick storage
self.tick_storage = TickStorage(
symbol=symbol,
timeframes=["1s", "1m", "5m", "15m", "1h", "4h", "1d"],
use_timescaledb=use_timescaledb
)
# Load historical data immediately for cold start
logger.info(f"Loading historical data for {symbol} during chart initialization")
try:
data_loaded = self.tick_storage.load_historical_data(symbol)
if data_loaded:
logger.info(f"Successfully loaded historical data for {symbol}")
# Log what we have
for tf in ["1s", "1m", "5m", "15m", "1h"]:
candle_count = len(self.tick_storage.candles.get(tf, []))
logger.info(f" {tf}: {candle_count} candles")
else:
logger.warning(f"Failed to load historical data for {symbol}")
except Exception as e:
logger.error(f"Error loading historical data during initialization: {str(e)}")
import traceback
logger.error(traceback.format_exc())
else:
self.tick_storage = tick_storage
# Create layout and callbacks if app is provided
if self.app is not None:
# Create the layout
self.app.layout = self._create_layout()
# Register callbacks
self._setup_callbacks()
# Log initialization
if self.enable_logging:
logger.info(f"RealTimeChart initialized: {self.symbol} ({self.timeframe}) ")
def _create_layout(self):
return html.Div([
# Header section with title and current price
html.Div([
html.H1(f"{self.symbol} Real-Time Chart", className="display-4"),
# Current price ticker
html.Div([
html.H4("Current Price:", style={"display": "inline-block", "marginRight": "10px"}),
html.H3(id="current-price", style={"display": "inline-block", "color": "#17a2b8"}),
html.Div([
html.H5("Balance:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.H5(id="current-balance", style={"display": "inline-block", "color": "#28a745"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
html.Div([
html.H5("Accumulated PnL:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.H5(id="accumulated-pnl", style={"display": "inline-block", "color": "#ffc107"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
# Add trade rate display
html.Div([
html.H5("Trade Rate:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.Span([
html.Span(id="trade-rate-second", style={"color": "#ff7f0e"}),
html.Span("/s, "),
html.Span(id="trade-rate-minute", style={"color": "#ff7f0e"}),
html.Span("/m, "),
html.Span(id="trade-rate-hour", style={"color": "#ff7f0e"}),
html.Span("/h")
], style={"display": "inline-block"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
], style={"textAlign": "center", "margin": "20px 0"}),
], style={"textAlign": "center", "marginBottom": "20px"}),
# Add interval component for periodic updates
dcc.Interval(
id='interval-component',
interval=500, # in milliseconds
n_intervals=0
),
# Add timeframe selection buttons
html.Div([
html.Button('1s', id='btn-1s', n_clicks=0, style=self.active_button_style),
html.Button('5s', id='btn-5s', n_clicks=0, style=self.button_style),
html.Button('15s', id='btn-15s', n_clicks=0, style=self.button_style),
html.Button('1m', id='btn-1m', n_clicks=0, style=self.button_style),
html.Button('5m', id='btn-5m', n_clicks=0, style=self.button_style),
html.Button('15m', id='btn-15m', n_clicks=0, style=self.button_style),
html.Button('1h', id='btn-1h', n_clicks=0, style=self.button_style),
], style={"textAlign": "center", "marginBottom": "20px"}),
# Store for the selected timeframe
dcc.Store(id='interval-store', data={'interval': 1}),
# Chart content (without wrapper div to avoid callback issues)
dcc.Graph(id='live-chart', style={"height": "600px"}),
dcc.Graph(id='secondary-charts', style={"height": "500px"}),
html.Div(id='positions-list')
])
def _create_chart_and_controls(self):
"""Create the chart and controls for the dashboard."""
try:
# Get selected interval from the dashboard (default to 1s if not available)
interval_seconds = 1
if hasattr(self, 'interval_store') and self.interval_store:
interval_seconds = self.interval_store.get('interval', 1)
# Create chart components
chart_div = html.Div([
# Update chart with data for the selected interval
dcc.Graph(
id='live-chart',
figure=self._update_main_chart(interval_seconds),
style={"height": "600px"}
),
# Update secondary charts
dcc.Graph(
id='secondary-charts',
figure=self._update_secondary_charts(),
style={"height": "500px"}
),
# Update positions list
html.Div(
id='positions-list',
children=self._get_position_list_rows()
)
])
return chart_div
except Exception as e:
logger.error(f"Error creating chart and controls: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return a simple error message as fallback
return html.Div(f"Error loading chart: {str(e)}", style={"color": "red", "padding": "20px"})
def _setup_callbacks(self):
"""Set up all the callbacks for the dashboard"""
# Callback for timeframe selection
@self.app.callback(
[Output('interval-store', 'data'),
Output('btn-1s', 'style'),
Output('btn-5s', 'style'),
Output('btn-15s', 'style'),
Output('btn-1m', 'style'),
Output('btn-5m', 'style'),
Output('btn-15m', 'style'),
Output('btn-1h', 'style')],
[Input('btn-1s', 'n_clicks'),
Input('btn-5s', 'n_clicks'),
Input('btn-15s', 'n_clicks'),
Input('btn-1m', 'n_clicks'),
Input('btn-5m', 'n_clicks'),
Input('btn-15m', 'n_clicks'),
Input('btn-1h', 'n_clicks')],
[dash.dependencies.State('interval-store', 'data')]
)
def update_interval(n1, n5, n15, n60, n300, n900, n3600, data):
ctx = dash.callback_context
if not ctx.triggered:
# Default state (1s selected)
return ({'interval': 1},
self.active_button_style,
self.button_style,
self.button_style,
self.button_style,
self.button_style,
self.button_style,
self.button_style)
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
# Initialize all buttons to inactive
button_styles = [self.button_style] * 7
# Set the active button and interval
if button_id == 'btn-1s':
button_styles[0] = self.active_button_style
return ({'interval': 1}, *button_styles)
elif button_id == 'btn-5s':
button_styles[1] = self.active_button_style
return ({'interval': 5}, *button_styles)
elif button_id == 'btn-15s':
button_styles[2] = self.active_button_style
return ({'interval': 15}, *button_styles)
elif button_id == 'btn-1m':
button_styles[3] = self.active_button_style
return ({'interval': 60}, *button_styles)
elif button_id == 'btn-5m':
button_styles[4] = self.active_button_style
return ({'interval': 300}, *button_styles)
elif button_id == 'btn-15m':
button_styles[5] = self.active_button_style
return ({'interval': 900}, *button_styles)
elif button_id == 'btn-1h':
button_styles[6] = self.active_button_style
return ({'interval': 3600}, *button_styles)
# Default - keep current interval
current_interval = data.get('interval', 1)
# Set the appropriate button as active
if current_interval == 1:
button_styles[0] = self.active_button_style
elif current_interval == 5:
button_styles[1] = self.active_button_style
elif current_interval == 15:
button_styles[2] = self.active_button_style
elif current_interval == 60:
button_styles[3] = self.active_button_style
elif current_interval == 300:
button_styles[4] = self.active_button_style
elif current_interval == 900:
button_styles[5] = self.active_button_style
elif current_interval == 3600:
button_styles[6] = self.active_button_style
return (data, *button_styles)
# Main update callback
@self.app.callback(
[Output('live-chart', 'figure'),
Output('secondary-charts', 'figure'),
Output('positions-list', 'children'),
Output('current-price', 'children'),
Output('current-balance', 'children'),
Output('accumulated-pnl', 'children'),
Output('trade-rate-second', 'children'),
Output('trade-rate-minute', 'children'),
Output('trade-rate-hour', 'children')],
[Input('interval-component', 'n_intervals'),
Input('interval-store', 'data')]
)
def update_all(n, interval_data):
try:
# Get selected interval
interval = interval_data.get('interval', 1)
# Get updated chart figures
main_fig = self._update_main_chart(interval)
secondary_fig = self._update_secondary_charts()
# Get updated positions list
positions = self._get_position_list_rows()
# Format the current price
current_price = "$ ---.--"
if self.latest_price is not None:
current_price = f"${self.latest_price:.2f}"
# Format balance and PnL
balance_text = f"${self.current_balance:.2f}"
pnl_text = f"${self.accumulative_pnl:.2f}"
# Get trade rate statistics
trade_rate = self.calculate_trade_rate()
per_second = f"{trade_rate['per_second']:.1f}"
per_minute = f"{trade_rate['per_minute']:.1f}"
per_hour = f"{trade_rate['per_hour']:.1f}"
return main_fig, secondary_fig, positions, current_price, balance_text, pnl_text, per_second, per_minute, per_hour
except Exception as e:
logger.error(f"Error in update callback: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return empty updates on error
return {}, {}, [], "Error", "$0.00", "$0.00", "0.0", "0.0", "0.0"
def _update_secondary_charts(self):
"""Update the secondary charts for other timeframes"""
try:
# For each timeframe, create a small chart
secondary_timeframes = ['1m', '5m', '15m', '1h']
if not all(tf in self.tick_storage.candles for tf in secondary_timeframes):
logger.warning("Not all secondary timeframes available")
# Return empty figure with a message
fig = make_subplots(rows=1, cols=4)
for i, tf in enumerate(secondary_timeframes, 1):
fig.add_annotation(
text=f"No data for {tf}",
xref=f"x{i}", yref=f"y{i}",
x=0.5, y=0.5, showarrow=False
)
return fig
# Create subplots for each timeframe
fig = make_subplots(
rows=1, cols=4,
subplot_titles=secondary_timeframes,
shared_yaxes=True
)
# Loop through each timeframe
for i, timeframe in enumerate(secondary_timeframes, 1):
interval_key = timeframe
# Get candles for this timeframe
if interval_key in self.tick_storage.candles and self.tick_storage.candles[interval_key]:
# For rendering, limit to the last 100 candles for performance
candles = self.tick_storage.candles[interval_key][-100:]
if candles:
# Extract OHLC values
timestamps = [candle['timestamp'] for candle in candles]
opens = [candle['open'] for candle in candles]
highs = [candle['high'] for candle in candles]
lows = [candle['low'] for candle in candles]
closes = [candle['close'] for candle in candles]
# Add candlestick trace
fig.add_trace(go.Candlestick(
x=timestamps,
open=opens,
high=highs,
low=lows,
close=closes,
name=interval_key,
increasing_line_color='rgba(0, 180, 0, 0.7)',
decreasing_line_color='rgba(255, 0, 0, 0.7)',
showlegend=False
), row=1, col=i)
else:
# Add empty annotation if no data
fig.add_annotation(
text=f"No data for {interval_key}",
xref=f"x{i}", yref=f"y{i}",
x=0.5, y=0.5, showarrow=False
)
# Update layout
fig.update_layout(
height=250,
template="plotly_dark",
showlegend=False,
margin=dict(l=0, r=0, t=30, b=0),
)
# Format Y-axis with 2 decimal places
fig.update_yaxes(tickformat=".2f")
# Format X-axis to show only the date (no time)
for i in range(1, 5):
fig.update_xaxes(
row=1, col=i,
rangeslider_visible=False,
rangebreaks=[dict(bounds=["sat", "mon"])], # hide weekends
tickformat="%m-%d" # Show month-day only
)
return fig
except Exception as e:
logger.error(f"Error updating secondary charts: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return empty figure on error
return make_subplots(rows=1, cols=4)
def _get_position_list_rows(self):
"""Generate HTML for the positions list (last 10 positions only)"""
try:
if not hasattr(self, 'positions') or not self.positions:
# Return placeholder if no positions
return html.Div("No positions to display", style={"textAlign": "center", "padding": "20px"})
# Create table headers
table_header = [
html.Thead(html.Tr([
html.Th("ID"),
html.Th("Action"),
html.Th("Entry Price"),
html.Th("Exit Price"),
html.Th("Amount"),
html.Th("PnL"),
html.Th("Time")
]))
]
# Create table rows for only the last 10 positions to avoid overcrowding
rows = []
last_positions = self.positions[-10:] if len(self.positions) > 10 else self.positions
for position in last_positions:
# Format times
entry_time = position.entry_timestamp.strftime("%H:%M:%S")
exit_time = position.exit_timestamp.strftime("%H:%M:%S") if position.exit_timestamp else "-"
# Format PnL
pnl_value = position.pnl if position.pnl is not None else 0
pnl_text = f"${pnl_value:.2f}" if position.pnl is not None else "-"
pnl_style = {"color": "green" if position.pnl and position.pnl > 0 else "red"}
# Create row
row = html.Tr([
html.Td(position.trade_id),
html.Td(position.action),
html.Td(f"${position.entry_price:.2f}"),
html.Td(f"${position.exit_price:.2f}" if position.exit_price else "-"),
html.Td(f"{position.amount:.4f}"),
html.Td(pnl_text, style=pnl_style),
html.Td(f"{entry_time} → {exit_time}")
])
rows.append(row)
table_body = [html.Tbody(rows)]
# Add summary row for total PnL and other statistics
total_trades = len(self.positions)
winning_trades = sum(1 for p in self.positions if p.pnl and p.pnl > 0)
win_rate = winning_trades / total_trades * 100 if total_trades > 0 else 0
# Format display colors for PnL
pnl_color = "green" if self.accumulative_pnl >= 0 else "red"
summary_row = html.Tr([
html.Td("SUMMARY", colSpan=2, style={"fontWeight": "bold"}),
html.Td(f"Trades: {total_trades}"),
html.Td(f"Win Rate: {win_rate:.1f}%"),
html.Td("Total PnL:", style={"fontWeight": "bold"}),
html.Td(f"${self.accumulative_pnl:.2f}",
style={"color": pnl_color, "fontWeight": "bold"}),
html.Td(f"Balance: ${self.current_balance:.2f}")
], style={"backgroundColor": "rgba(80, 80, 80, 0.3)"})
# Create the table with improved styling
table = html.Table(
table_header + table_body + [html.Tfoot([summary_row])],
style={
"width": "100%",
"textAlign": "center",
"borderCollapse": "collapse",
"marginTop": "20px"
},
className="table table-striped table-dark"
)
return table
except Exception as e:
logger.error(f"Error generating position list: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return html.Div("Error displaying positions")
def get_candles(self, interval_seconds=60):
"""Get candles for the specified interval"""
try:
# Get candles from tick storage
interval_key = self._get_interval_key(interval_seconds)
candles_list = self.tick_storage.get_candles(interval_key)
if not candles_list:
logger.warning(f"No candle data available for {interval_key} - trying to load historical data")
# Try to load historical data if we don't have any
self.tick_storage.load_historical_data(self.symbol)
candles_list = self.tick_storage.get_candles(interval_key)
if not candles_list:
logger.error(f"Still no candle data available for {interval_key} after loading historical data")
return []
logger.info(f"Retrieved {len(candles_list)} candles for {interval_key}")
return candles_list
except Exception as e:
logger.error(f"Error getting candles: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return [] # Return empty list on error
def _get_interval_key(self, interval_seconds):
"""Convert interval seconds to a key used in the tick storage"""
if interval_seconds < 60:
return f"{interval_seconds}s"
elif interval_seconds < 3600:
return f"{interval_seconds // 60}m"
elif interval_seconds < 86400:
return f"{interval_seconds // 3600}h"
else:
return f"{interval_seconds // 86400}d"
def calculate_trade_rate(self):
"""Calculate and return trading rate statistics"""
now = datetime.now()
# Only calculate once per second to avoid unnecessary processing
if (now - self.last_trade_rate_calculation).total_seconds() < 1.0:
return self.trade_rate
self.last_trade_rate_calculation = now
# Clean up old trade times (older than 1 hour)
one_hour_ago = now - timedelta(hours=1)
self.trade_times = [t for t in self.trade_times if t > one_hour_ago]
if not self.trade_times:
self.trade_rate = {"per_second": 0, "per_minute": 0, "per_hour": 0}
return self.trade_rate
# Calculate rates based on time windows
last_second = now - timedelta(seconds=1)
last_minute = now - timedelta(minutes=1)
# Count trades in each time window
trades_last_second = sum(1 for t in self.trade_times if t > last_second)
trades_last_minute = sum(1 for t in self.trade_times if t > last_minute)
trades_last_hour = len(self.trade_times) # All remaining trades are from last hour
# Calculate rates
self.trade_rate = {
"per_second": trades_last_second,
"per_minute": trades_last_minute,
"per_hour": trades_last_hour
}
return self.trade_rate
def _update_chart_and_positions(self):
"""Update the chart with current data and positions"""
try:
# Force an update of the charts
self._update_main_chart(1) # Update 1s chart by default
self._update_secondary_charts()
logger.debug("Updated charts and positions")
return True
except Exception as e:
logger.error(f"Error updating chart and positions: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
async def start_websocket(self):
"""Start the websocket connection for real-time data"""
try:
# Step 1: Clear everything related to positions FIRST, before any other operations
logger.info(f"Initializing fresh chart for {self.symbol} - clearing all previous positions")
self.positions = [] # Clear positions list
self.accumulative_pnl = 0.0 # Reset accumulated PnL
self.current_balance = 100.0 # Reset balance
# Step 2: Clear any previous tick data to avoid using stale data from previous training sessions
self.tick_storage.ticks = []
# Step 3: Clear any previous 1s candles before loading historical data
self.tick_storage.candles['1s'] = []
logger.info("Initialized empty 1s candles, tick collection, and positions for fresh data")
# Load historical data first to ensure we have candles for all timeframes
logger.info(f"Loading historical data for {self.symbol}")
# Load historical data directly from tick_storage
self.tick_storage.load_historical_data(self.symbol)
# Double check that we have the 1s timeframe initialized
if '1s' not in self.tick_storage.candles:
self.tick_storage.candles['1s'] = []
logger.info(f"After loading historical data, 1s candles count: {len(self.tick_storage.candles['1s'])}")
# Make sure we update the charts once with historical data before websocket starts
# Update all the charts with the initial historical data
self._update_chart_and_positions()
# Initialize websocket
self.websocket = ExchangeWebSocket(self.symbol)
await self.websocket.connect()
logger.info(f"WebSocket connected for {self.symbol}")
# Counter for received ticks
tick_count = 0
last_update_time = time.time()
# Start receiving data
while self.websocket.running:
try:
data = await self.websocket.receive()
if data:
# Process the received data
if 'price' in data:
tick_count += 1
# Create a proper tick with timestamp object
tick = {
'price': data['price'],
'quantity': data.get('volume', 0),
'timestamp': pd.Timestamp(data['timestamp'], unit='ms')
}
# Update tick storage with proper tick object
self.tick_storage.add_tick(tick)
# Store latest values
self.latest_price = data['price']
self.latest_volume = data.get('volume', 0)
self.latest_timestamp = datetime.fromtimestamp(data['timestamp'] / 1000)
# Force chart update every 5 seconds
current_time = time.time()
if current_time - last_update_time >= 5.0:
self._update_chart_and_positions()
last_update_time = current_time
logger.debug("Forced chart update after new ticks")
# Log tick processing for debugging (every 100 ticks)
if tick_count % 100 == 0:
logger.info(f"Processed {tick_count} ticks, current price: ${self.latest_price:.2f}")
logger.info(f"Current 1s candles count: {len(self.tick_storage.candles['1s'])}")
except Exception as e:
logger.error(f"Error processing websocket data: {str(e)}")
await asyncio.sleep(1) # Wait before retrying
except Exception as e:
logger.error(f"WebSocket error for {self.symbol}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
if hasattr(self, 'websocket'):
await self.websocket.close()
def run(self, host='localhost', port=8050):
"""Run the Dash app on the specified host and port"""
try:
logger.info("="*60)
logger.info(f"🚀 STARTING WEB UI FOR {self.symbol}")
logger.info(f"📱 Web interface available at: http://{host}:{port}/")
logger.info(f"🌐 Open this URL in your browser to view the trading chart")
logger.info("="*60)
self.app.run(debug=False, use_reloader=False, host=host, port=port)
except Exception as e:
logger.error(f"Error running Dash app: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def add_trade(self, action, price, amount, timestamp):
"""
Adds a trade to the chart's positions and tracks the trade time.
Handles closing previous open positions if necessary.
"""
try:
trade_timestamp = datetime.fromtimestamp(timestamp / 1000) if isinstance(timestamp, (int, float)) else timestamp
trade_timestamp = trade_timestamp or datetime.now()
# Close the previous open position if this trade is opposite
last_open_position = next((pos for pos in reversed(self.positions) if pos.is_open), None)
if last_open_position:
if (action == 'SELL' and last_open_position.action == 'BUY') or \
(action == 'BUY' and last_open_position.action == 'SELL'):
closed_pnl = last_open_position.close(price, trade_timestamp)
self.accumulative_pnl += closed_pnl
self.current_balance += closed_pnl # Simplified balance update
logger.info(f"Closed {last_open_position.action} position {last_open_position.trade_id} at {price:.2f}. PnL: {closed_pnl:.4f}")
# Create and add the new position
new_position = Position(
action=action,
entry_price=price,
amount=amount,
timestamp=trade_timestamp
)
self.positions.append(new_position)
self.trade_times.append(datetime.now()) # Use current time for rate calculation accuracy
logger.info(f"Added new {action} position {new_position.trade_id} at {price:.2f}, Time: {trade_timestamp}")
# Limit the number of stored positions and trade times to prevent memory issues
max_history = 1000
if len(self.positions) > max_history:
self.positions = self.positions[-max_history:]
if len(self.trade_times) > max_history * 5: # Keep more trade times for rate calc
self.trade_times = self.trade_times[-max_history*5:]
except Exception as e:
logger.error(f"Error adding trade to chart: {e}")
import traceback
logger.error(traceback.format_exc())
def add_nn_signal(self, symbol, signal, confidence, timestamp):
# Placeholder for adding NN signals if needed in the future
pass
def _update_main_chart(self, interval_seconds):
"""Update the main chart for the specified interval"""
try:
# Convert interval seconds to timeframe key
interval_key = self._get_interval_key(interval_seconds)
# Get candles for this timeframe
if interval_key not in self.tick_storage.candles or not self.tick_storage.candles[interval_key]:
logger.warning(f"No candle data available for {interval_key}")
# Return empty figure with a message
fig = go.Figure()
fig.add_annotation(
text=f"No data available for {interval_key}",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=20, color="white")
)
fig.update_layout(
template="plotly_dark",
height=600,
title=f"{self.symbol} - {interval_key} Chart",
xaxis_title="Time",
yaxis_title="Price ($)"
)
return fig
# Get candles (limit to last 500 for performance)
candles = self.tick_storage.candles[interval_key][-500:]
if not candles:
# Return empty figure if no candles
fig = go.Figure()
fig.add_annotation(
text=f"No candles available for {interval_key}",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=20, color="white")
)
fig.update_layout(
template="plotly_dark",
height=600,
title=f"{self.symbol} - {interval_key} Chart"
)
return fig
# Extract OHLC values
timestamps = [candle['timestamp'] for candle in candles]
opens = [candle['open'] for candle in candles]
highs = [candle['high'] for candle in candles]
lows = [candle['low'] for candle in candles]
closes = [candle['close'] for candle in candles]
volumes = [candle['volume'] for candle in candles]
# Create the main figure
fig = go.Figure()
# Add candlestick trace
fig.add_trace(go.Candlestick(
x=timestamps,
open=opens,
high=highs,
low=lows,
close=closes,
name=f"{self.symbol}",
increasing_line_color='rgba(0, 200, 0, 0.8)',
decreasing_line_color='rgba(255, 0, 0, 0.8)',
increasing_fillcolor='rgba(0, 200, 0, 0.3)',
decreasing_fillcolor='rgba(255, 0, 0, 0.3)'
))
# Add trade markers if we have positions
if hasattr(self, 'positions') and self.positions:
# Get recent positions (last 50 to avoid clutter)
recent_positions = self.positions[-50:] if len(self.positions) > 50 else self.positions
for position in recent_positions:
# Add entry marker
fig.add_trace(go.Scatter(
x=[position.entry_timestamp],
y=[position.entry_price],
mode='markers',
marker=dict(
symbol='triangle-up' if position.action == 'BUY' else 'triangle-down',
size=12,
color='green' if position.action == 'BUY' else 'red',
line=dict(width=2, color='white')
),
name=f"{position.action} Entry",
hovertemplate=f"{position.action} Entry
" +
f"Price: ${position.entry_price:.2f}
" +
f"Time: {position.entry_timestamp}
" +
f"ID: {position.trade_id}",
showlegend=False
))
# Add exit marker if position is closed
if not position.is_open and position.exit_price and position.exit_timestamp:
fig.add_trace(go.Scatter(
x=[position.exit_timestamp],
y=[position.exit_price],
mode='markers',
marker=dict(
symbol='triangle-down' if position.action == 'BUY' else 'triangle-up',
size=12,
color='blue',
line=dict(width=2, color='white')
),
name=f"{position.action} Exit",
hovertemplate=f"{position.action} Exit
" +
f"Price: ${position.exit_price:.2f}
" +
f"Time: {position.exit_timestamp}
" +
f"PnL: ${position.pnl:.2f}
" +
f"ID: {position.trade_id}",
showlegend=False
))
# Update layout
fig.update_layout(
template="plotly_dark",
height=600,
title=f"{self.symbol} - {interval_key} Chart (Live Trading)",
xaxis_title="Time",
yaxis_title="Price ($)",
showlegend=True,
margin=dict(l=0, r=0, t=40, b=0),
xaxis_rangeslider_visible=False,
hovermode='x unified'
)
# Format Y-axis with appropriate decimal places
fig.update_yaxes(tickformat=".2f")
# Format X-axis
fig.update_xaxes(
rangeslider_visible=False,
type='date'
)
return fig
except Exception as e:
logger.error(f"Error updating main chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return error figure
fig = go.Figure()
fig.add_annotation(
text=f"Error loading chart: {str(e)}",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16, color="red")
)
fig.update_layout(
template="plotly_dark",
height=600,
title=f"{self.symbol} Chart - Error"
)
return fig
class BinanceWebSocket:
"""Binance WebSocket implementation for real-time tick data"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').lower()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.message_count = 0
# Binance WebSocket configuration
self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade"
logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}")
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 100 == 0: # Log every 100th message to avoid spam
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
data = json.loads(message)
# Process trade data
if 'e' in data and data['e'] == 'trade':
trade_data = {
'timestamp': data['T'], # Trade time
'price': float(data['p']), # Price
'volume': float(data['q']), # Quantity
'type': 'trade'
}
logger.debug(f"Processed trade data: {trade_data}")
return trade_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.ws.close()
class ExchangeWebSocket:
"""Generic WebSocket interface for cryptocurrency exchanges"""
def __init__(self, symbol: str, exchange: str = "binance"):
self.symbol = symbol
self.exchange = exchange.lower()
self.ws = None
# Initialize the appropriate WebSocket implementation
if self.exchange == "binance":
self.ws = BinanceWebSocket(symbol)
else:
raise ValueError(f"Unsupported exchange: {exchange}")
async def connect(self):
"""Connect to the exchange WebSocket"""
return await self.ws.connect()
async def receive(self) -> Optional[Dict]:
"""Receive data from the WebSocket"""
return await self.ws.receive()
async def close(self):
"""Close the WebSocket connection"""
await self.ws.close()
@property
def running(self):
"""Check if the WebSocket is running"""
return self.ws.running if self.ws else False
async def main():
global charts # Make charts globally accessible for NN integration
symbols = ["ETH/USDT"] # Only use one symbol to simplify debugging
logger.info(f"Starting application for symbols: {symbols}")
# Initialize neural network if enabled
if NN_ENABLED:
logger.info("Initializing Neural Network integration...")
if setup_neural_network():
logger.info("Neural Network integration initialized successfully")
else:
logger.warning("Neural Network integration failed to initialize")
charts = []
websocket_tasks = []
# Create a chart and websocket task for each symbol
for symbol in symbols:
try:
# Create a proper Dash app for each chart
app = dash.Dash(__name__,
external_stylesheets=[dbc.themes.DARKLY],
suppress_callback_exceptions=True)
# Initialize the chart with the app
chart = RealTimeChart(
app=app,
symbol=symbol,
timeframe='1m',
standalone=True,
chart_title=f"{symbol} Realtime Trading Chart",
debug_mode=True,
port=8050,
height=800,
width=1200
)
charts.append(chart)
# Start the chart's websocket in a separate task
websocket_task = asyncio.create_task(chart.start_websocket())
websocket_tasks.append(websocket_task)
# Run the Dash app in a separate thread
port = 8050 + len(charts) - 1 # Use different ports for each chart
logger.info(f"Starting chart for {chart.symbol} on port {port}")
thread = Thread(target=lambda c=chart, p=port: c.run(port=p))
thread.daemon = True
thread.start()
logger.info(f"Thread started for {chart.symbol} on port {port}")
except Exception as e:
logger.error(f"Error initializing chart for {symbol}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
try:
# Keep the main task running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
finally:
for task in websocket_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application terminated by user")