import asyncio
import json
import logging
import datetime
from typing import Dict, List, Optional
import websockets
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import dash
from dash import html, dcc
from dash.dependencies import Input, Output
import pandas as pd
import numpy as np
from collections import deque
import time
from threading import Thread
import requests
import os
from datetime import datetime, timedelta
import pytz
import tzlocal
import threading
import random
import dash_bootstrap_components as dbc
# Configure logging with more detailed format
logging.basicConfig(
level=logging.INFO, # Changed to DEBUG for more detailed logs
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('realtime_chart.log')
]
)
logger = logging.getLogger(__name__)
# Neural Network integration (conditional import)
NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1'
nn_orchestrator = None
nn_inference_thread = None
if NN_ENABLED:
try:
import sys
# Add project root to sys.path if needed
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
from NN.main import NeuralNetworkOrchestrator
logger.info("Neural Network module enabled")
except ImportError as e:
logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}")
NN_ENABLED = False
# NN utility functions
def setup_neural_network():
"""Initialize the neural network components if enabled"""
global nn_orchestrator, NN_ENABLED
if not NN_ENABLED:
return False
try:
# Get configuration from environment variables or use defaults
symbol = os.environ.get('NN_SYMBOL', 'BTC/USDT')
timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',')
output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL
# Configure the orchestrator
config = {
'symbol': symbol,
'timeframes': timeframes,
'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')),
'n_features': 5, # OHLCV
'output_size': output_size,
'model_dir': 'NN/models/saved',
'data_dir': 'NN/data'
}
# Initialize the orchestrator
logger.info(f"Initializing Neural Network Orchestrator with config: {config}")
nn_orchestrator = NeuralNetworkOrchestrator(config)
# Start inference thread if enabled
inference_interval = int(os.environ.get('NN_INFERENCE_INTERVAL', '60'))
if inference_interval > 0:
start_nn_inference_thread(inference_interval)
return True
except Exception as e:
logger.error(f"Error setting up neural network: {str(e)}")
import traceback
logger.error(traceback.format_exc())
NN_ENABLED = False
return False
def start_nn_inference_thread(interval_seconds):
"""Start a background thread to periodically run inference with the neural network"""
global nn_inference_thread
if not NN_ENABLED or nn_orchestrator is None:
logger.warning("Cannot start inference thread - Neural Network not enabled or initialized")
return False
def inference_worker():
"""Worker function for the inference thread"""
model_type = os.environ.get('NN_MODEL_TYPE', 'cnn')
timeframe = os.environ.get('NN_TIMEFRAME', '1h')
logger.info(f"Starting neural network inference thread with {interval_seconds}s interval")
logger.info(f"Using model type: {model_type}, timeframe: {timeframe}")
# Wait a bit for charts to initialize
time.sleep(5)
# Track active charts
active_charts = []
while True:
try:
# Find active charts if we don't have them yet
if not active_charts and 'charts' in globals():
active_charts = globals()['charts']
logger.info(f"Found {len(active_charts)} active charts for NN signals")
# Run inference
result = nn_orchestrator.run_inference_pipeline(
model_type=model_type,
timeframe=timeframe
)
if result:
# Log the result
logger.info(f"Neural network inference result: {result}")
# Add signal to charts
if active_charts:
try:
if 'action' in result:
action = result['action']
timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00'))
# Get probability if available
probability = None
if 'probability' in result:
probability = result['probability']
elif 'probabilities' in result:
probability = result['probabilities'].get(action, None)
# Add signal to each chart
for chart in active_charts:
if hasattr(chart, 'add_nn_signal'):
chart.add_nn_signal(action, timestamp, probability)
except Exception as e:
logger.error(f"Error adding NN signal to chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Sleep for the interval
time.sleep(interval_seconds)
except Exception as e:
logger.error(f"Error in inference thread: {str(e)}")
import traceback
logger.error(traceback.format_exc())
time.sleep(5) # Wait a bit before retrying
# Create and start the thread
nn_inference_thread = threading.Thread(target=inference_worker, daemon=True)
nn_inference_thread.start()
return True
# Try to get local timezone, default to Sofia/EET if not available
try:
local_timezone = tzlocal.get_localzone()
# Get timezone name safely
try:
tz_name = str(local_timezone)
# Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone
if hasattr(local_timezone, 'zone'):
tz_name = local_timezone.zone
elif hasattr(local_timezone, 'key'):
tz_name = local_timezone.key
else:
tz_name = str(local_timezone)
except:
tz_name = "Local"
logger.info(f"Detected local timezone: {local_timezone} ({tz_name})")
except Exception as e:
logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET")
local_timezone = pytz.timezone('Europe/Sofia')
tz_name = "Europe/Sofia"
def convert_to_local_time(timestamp):
"""Convert timestamp to local timezone"""
try:
if isinstance(timestamp, pd.Timestamp):
dt = timestamp.to_pydatetime()
elif isinstance(timestamp, np.datetime64):
dt = pd.Timestamp(timestamp).to_pydatetime()
elif isinstance(timestamp, str):
dt = pd.to_datetime(timestamp).to_pydatetime()
else:
dt = timestamp
# If datetime is naive (no timezone), assume it's UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=pytz.UTC)
# Convert to local timezone
local_dt = dt.astimezone(local_timezone)
return local_dt
except Exception as e:
logger.error(f"Error converting timestamp to local time: {str(e)}")
return timestamp
class TradeTickStorage:
"""Storage for trade ticks with a maximum age limit"""
def __init__(self, symbol: str = None, max_age_seconds: int = 3600, use_sample_data: bool = True, log_no_ticks_warning: bool = False): # 1 hour by default, up from 30 min
"""Initialize the tick storage
Args:
symbol: Trading symbol
max_age_seconds: Maximum age for ticks to be stored
use_sample_data: If True, generate sample ticks when no real ticks available
log_no_ticks_warning: If True, log a warning when no ticks are available
"""
self.symbol = symbol
self.ticks = []
self.max_age_seconds = max_age_seconds
self.last_cleanup_time = time.time()
self.cleanup_interval = 60 # run cleanup every 60 seconds
self.cache_dir = "cache"
self.use_sample_data = use_sample_data
self.log_no_ticks_warning = log_no_ticks_warning
self.last_sample_price = 83000.0 # Starting price for sample data (for BTC)
self.last_sample_time = time.time() * 1000 # Starting time for sample data
self.last_tick_time = 0 # Initialize last_tick_time attribute
self.tick_count = 0 # Initialize tick_count attribute
# Create cache directory if it doesn't exist
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
# Try to load cached ticks
self._load_cached_ticks()
logger.info(f"Initialized TradeTickStorage for {symbol} with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds")
def add_tick(self, tick: Dict):
"""Add a tick to storage
Args:
tick: Tick data dict with fields:
price: The price
volume: The volume
timestamp: Timestamp in milliseconds
"""
if not tick:
return
# Check if we need to generate a timestamp
if 'timestamp' not in tick:
tick['timestamp'] = int(time.time() * 1000) # Current time in ms
# Ensure timestamp is an integer (milliseconds since epoch)
if not isinstance(tick['timestamp'], int):
try:
# Try to convert from float or string
tick['timestamp'] = int(float(tick['timestamp']))
except (ValueError, TypeError):
# If conversion fails, use current time
tick['timestamp'] = int(time.time() * 1000)
# Set default volume if not present
if 'volume' not in tick:
tick['volume'] = 0.01 # Default small volume
# Add tick to storage with a copy to avoid mutation
self.ticks.append(tick.copy())
# Keep track of latest tick for stats
self.last_tick_time = max(self.last_tick_time, tick['timestamp'])
# Cache every 100 ticks to avoid data loss
self.tick_count += 1
if self.tick_count % 100 == 0:
self._cache_ticks()
# Periodically clean up old ticks
if self.tick_count % 1000 == 0:
self._cleanup()
def _cleanup(self):
"""Remove ticks older than max_age_seconds"""
# Get current time in milliseconds
now = int(time.time() * 1000)
# Remove old ticks
cutoff = now - (self.max_age_seconds * 1000)
original_count = len(self.ticks)
self.ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff]
removed = original_count - len(self.ticks)
if removed > 0:
logger.debug(f"Cleaned up {removed} old ticks, remaining: {len(self.ticks)}")
def _load_cached_ticks(self):
"""Load cached ticks from disk on startup"""
# Create symbol-specific filename
symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower()
cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv")
if not os.path.exists(cache_file):
logger.info(f"No cached ticks found for {self.symbol}")
return
try:
# Check if cache is fresh (less than 10 minutes old)
file_age = time.time() - os.path.getmtime(cache_file)
if file_age > 600: # 10 minutes
logger.info(f"Cached ticks for {self.symbol} are too old ({file_age:.1f}s), skipping")
return
# Load cached ticks
tick_df = pd.read_csv(cache_file)
if tick_df.empty:
logger.info(f"Cached ticks file for {self.symbol} is empty")
return
# Convert to list of dicts and add to storage
cached_ticks = tick_df.to_dict('records')
self.ticks.extend(cached_ticks)
logger.info(f"Loaded {len(cached_ticks)} cached ticks for {self.symbol} from {cache_file}")
except Exception as e:
logger.error(f"Error loading cached ticks for {self.symbol}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def _cache_ticks(self):
"""Cache recent ticks to disk"""
if not self.ticks:
return
# Get ticks from last 10 minutes
now = int(time.time() * 1000) # Current time in ms
cutoff = now - (600 * 1000) # 10 minutes in ms
recent_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff]
if not recent_ticks:
logger.debug("No recent ticks to cache")
return
# Create symbol-specific filename
symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower()
cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv")
# Save to disk
try:
tick_df = pd.DataFrame(recent_ticks)
tick_df.to_csv(cache_file, index=False)
logger.info(f"Cached {len(recent_ticks)} recent ticks for {self.symbol} to {cache_file}")
except Exception as e:
logger.error(f"Error caching ticks: {str(e)}")
def get_latest_price(self) -> Optional[float]:
"""Get the latest price from the most recent tick"""
if self.ticks:
return self.ticks[-1].get('price')
return None
def get_price_stats(self) -> Dict:
"""Get stats about the prices in storage"""
if not self.ticks:
return {
'min': None,
'max': None,
'latest': None,
'count': 0,
'age_seconds': 0
}
prices = [tick['price'] for tick in self.ticks]
latest_timestamp = self.ticks[-1]['timestamp']
oldest_timestamp = self.ticks[0]['timestamp']
return {
'min': min(prices),
'max': max(prices),
'latest': prices[-1],
'count': len(prices),
'age_seconds': (latest_timestamp - oldest_timestamp) / 1000
}
def get_ticks_as_df(self) -> pd.DataFrame:
"""Return ticks as a DataFrame"""
if not self.ticks:
logger.warning("No ticks available for DataFrame conversion")
return pd.DataFrame()
# Ensure we have fresh data
self._cleanup()
# Create a new list from ticks to avoid modifying the original data
ticks_data = self.ticks.copy()
# Ensure we have the latest ticks at the end of the DataFrame
ticks_data.sort(key=lambda x: x['timestamp'])
df = pd.DataFrame(ticks_data)
if not df.empty:
logger.debug(f"Converting timestamps for {len(df)} ticks")
# Ensure timestamp column exists
if 'timestamp' not in df.columns:
logger.error("Tick data missing timestamp column")
return pd.DataFrame()
# Check timestamp datatype before conversion
sample_ts = df['timestamp'].iloc[0] if len(df) > 0 else None
logger.debug(f"Sample timestamp before conversion: {sample_ts}, type: {type(sample_ts)}")
# Convert timestamps to datetime
try:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.debug(f"Timestamps converted to datetime successfully")
if len(df) > 0:
logger.debug(f"Sample converted timestamp: {df['timestamp'].iloc[0]}")
except Exception as e:
logger.error(f"Error converting timestamps: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
return df
def get_candles(self, interval_seconds: int = 1, start_time_ms: int = None, end_time_ms: int = None) -> pd.DataFrame:
"""Generate candlestick data from ticks
Args:
interval_seconds: Interval in seconds for each candle
start_time_ms: Start time in milliseconds
end_time_ms: End time in milliseconds
Returns:
DataFrame with candlestick data
"""
# Get filtered ticks
ticks = self.get_ticks_from_time(start_time_ms, end_time_ms)
if not ticks:
if self.use_sample_data:
# Generate multiple sample ticks to create several candles
current_time = int(time.time() * 1000)
sample_ticks = []
# Generate ticks for the past 10 intervals
for i in range(20):
# Base price with some trend
base_price = self.last_sample_price * (1 + 0.0001 * (10 - i))
# Add some randomness to the price
random_factor = random.uniform(-0.002, 0.002) # Small random change
tick_price = base_price * (1 + random_factor)
# Create timestamp with appropriate offset
tick_time = current_time - (i * interval_seconds * 1000 // 2)
sample_tick = {
'price': tick_price,
'volume': random.uniform(0.01, 0.5),
'timestamp': tick_time,
'is_sample': True
}
sample_ticks.append(sample_tick)
# Update the last sample values
self.last_sample_price = sample_ticks[0]['price']
self.last_sample_time = sample_ticks[0]['timestamp']
# Add the sample ticks in chronological order
for tick in sorted(sample_ticks, key=lambda x: x['timestamp']):
self.add_tick(tick)
# Try again with the new ticks
ticks = self.get_ticks_from_time(start_time_ms, end_time_ms)
if not ticks and self.log_no_ticks_warning:
logger.warning("Still no ticks available after adding sample data")
elif self.log_no_ticks_warning:
logger.warning("No ticks available for candle formation")
return pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
else:
return pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
# Ensure ticks are up to date
try:
self._cleanup()
except Exception as cleanup_error:
logger.error(f"Error cleaning up ticks: {str(cleanup_error)}")
df = pd.DataFrame(ticks)
if df.empty:
logger.warning("Tick DataFrame is empty after filtering/conversion")
return pd.DataFrame()
logger.info(f"Preparing to create candles from {len(df)} ticks with {interval_seconds}s interval")
# First, ensure all required columns exist
required_columns = ['timestamp', 'price', 'volume']
for col in required_columns:
if col not in df.columns:
logger.error(f"Required column '{col}' missing from tick data")
return pd.DataFrame()
# Make sure DataFrame has no duplicated timestamps before setting index
try:
if 'timestamp' in df.columns:
# Check for duplicate timestamps
duplicate_count = df['timestamp'].duplicated().sum()
if duplicate_count > 0:
logger.warning(f"Found {duplicate_count} duplicate timestamps, keeping the last occurrence")
# Keep the last occurrence of each timestamp
df = df.drop_duplicates(subset='timestamp', keep='last')
# Convert timestamp to datetime if it's not already
if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
logger.debug("Converting timestamp to datetime")
# Try multiple approaches to convert timestamps
try:
# First, try to convert from milliseconds (integer timestamps)
if pd.api.types.is_integer_dtype(df['timestamp']):
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
else:
# Otherwise try standard conversion
df['timestamp'] = pd.to_datetime(df['timestamp'])
except Exception as conv_error:
logger.error(f"Error converting timestamps to datetime: {str(conv_error)}")
# Try a fallback approach
try:
# Fallback for integer timestamps
if df['timestamp'].iloc[0] > 1000000000000: # Check if milliseconds timestamp
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
else: # Otherwise assume seconds
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
except Exception as fallback_error:
logger.error(f"Fallback timestamp conversion failed: {str(fallback_error)}")
return pd.DataFrame()
# Use timestamp column for resampling
df = df.set_index('timestamp')
except Exception as prep_error:
logger.error(f"Error preprocessing DataFrame for resampling: {str(prep_error)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
# Create interval string for resampling - use 's' instead of deprecated 'S'
interval_str = f'{interval_seconds}s'
# Resample to create OHLCV candles with multiple fallback options
logger.debug(f"Resampling with interval: {interval_str}")
candles = None
# First attempt - individual column resampling
try:
# Check that price column exists and has enough data
if 'price' not in df.columns:
raise ValueError("Price column missing from DataFrame")
if len(df) < 2:
logger.warning("Not enough data points for resampling, using direct data")
# For single data point, create a single candle
if len(df) == 1:
price_val = df['price'].iloc[0]
volume_val = df['volume'].iloc[0] if 'volume' in df.columns else 0
timestamp_val = df.index[0]
candles = pd.DataFrame({
'timestamp': [timestamp_val],
'open': [price_val],
'high': [price_val],
'low': [price_val],
'close': [price_val],
'volume': [volume_val]
})
return candles
else:
# No data
return pd.DataFrame()
# Resample and aggregate each column separately
open_df = df['price'].resample(interval_str).first()
high_df = df['price'].resample(interval_str).max()
low_df = df['price'].resample(interval_str).min()
close_df = df['price'].resample(interval_str).last()
volume_df = df['volume'].resample(interval_str).sum()
# Check for length mismatches before combining
expected_length = len(open_df)
if (len(high_df) != expected_length or
len(low_df) != expected_length or
len(close_df) != expected_length or
len(volume_df) != expected_length):
logger.warning("Length mismatch in resampled columns, falling back to alternative method")
raise ValueError("Length mismatch")
# Combine into a single DataFrame
candles = pd.DataFrame({
'open': open_df,
'high': high_df,
'low': low_df,
'close': close_df,
'volume': volume_df
})
logger.debug(f"Successfully created {len(candles)} candles with individual column resampling")
except Exception as resample_error:
logger.error(f"Error in individual column resampling: {str(resample_error)}")
# Second attempt - built-in agg method
try:
logger.debug("Trying fallback resampling method with agg()")
candles = df.resample(interval_str).agg({
'price': ['first', 'max', 'min', 'last'],
'volume': 'sum'
})
# Flatten MultiIndex columns
candles.columns = ['open', 'high', 'low', 'close', 'volume']
logger.debug(f"Successfully created {len(candles)} candles with agg() method")
except Exception as agg_error:
logger.error(f"Error in agg() resampling: {str(agg_error)}")
# Third attempt - manual candle construction
try:
logger.debug("Trying manual candle construction method")
resampler = df.resample(interval_str)
candle_data = []
for name, group in resampler:
if not group.empty:
candle = {
'timestamp': name,
'open': group['price'].iloc[0],
'high': group['price'].max(),
'low': group['price'].min(),
'close': group['price'].iloc[-1],
'volume': group['volume'].sum() if 'volume' in group.columns else 0
}
candle_data.append(candle)
if candle_data:
candles = pd.DataFrame(candle_data)
logger.debug(f"Successfully created {len(candles)} candles with manual method")
else:
logger.warning("No candles created with manual method")
return pd.DataFrame()
except Exception as manual_error:
logger.error(f"Error in manual candle construction: {str(manual_error)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
# Ensure the result isn't empty
if candles is None or candles.empty:
logger.warning("No candles were created after all resampling attempts")
return pd.DataFrame()
# Reset index to get timestamp as column
try:
candles = candles.reset_index()
except Exception as reset_error:
logger.error(f"Error resetting index: {str(reset_error)}")
# Try to create a new DataFrame with the timestamp index as a column
try:
timestamp_col = candles.index.to_list()
candles_dict = candles.to_dict('list')
candles_dict['timestamp'] = timestamp_col
candles = pd.DataFrame(candles_dict)
except Exception as fallback_error:
logger.error(f"Error in fallback index reset: {str(fallback_error)}")
return pd.DataFrame()
# Ensure no NaN values
try:
nan_count_before = candles.isna().sum().sum()
if nan_count_before > 0:
logger.warning(f"Found {nan_count_before} NaN values in candles, dropping them")
candles = candles.dropna()
except Exception as nan_error:
logger.error(f"Error handling NaN values: {str(nan_error)}")
# Try to fill NaN values instead of dropping
try:
candles = candles.fillna(method='ffill').fillna(method='bfill')
except:
pass
logger.debug(f"Generated {len(candles)} candles from {len(df)} ticks")
return candles
def get_candle_stats(self) -> Dict:
"""Get statistics about cached candles for different intervals"""
stats = {}
# Define intervals to check
intervals = [1, 5, 15, 60, 300, 900, 3600]
for interval in intervals:
candles = self.get_candles(interval_seconds=interval)
count = len(candles) if not candles.empty else 0
# Get time range if we have candles
time_range = None
if count > 0:
try:
start_time = candles['timestamp'].min()
end_time = candles['timestamp'].max()
if isinstance(start_time, pd.Timestamp):
start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(end_time, pd.Timestamp):
end_time = end_time.strftime('%Y-%m-%d %H:%M:%S')
time_range = f"{start_time} to {end_time}"
except:
time_range = "Unknown"
stats[f"{interval}s"] = {
'count': count,
'time_range': time_range
}
return stats
def get_ticks_from_time(self, start_time_ms: int = None, end_time_ms: int = None) -> List[Dict]:
"""Get ticks within a specific time range
Args:
start_time_ms: Start time in milliseconds (None for no lower bound)
end_time_ms: End time in milliseconds (None for no upper bound)
Returns:
List of ticks within the time range
"""
if not self.ticks:
return []
# Ensure ticks are updated
self._cleanup()
# Apply time filters if specified
filtered_ticks = self.ticks
if start_time_ms is not None:
filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] >= start_time_ms]
if end_time_ms is not None:
filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] <= end_time_ms]
logger.debug(f"Retrieved {len(filtered_ticks)} ticks from time range {start_time_ms} to {end_time_ms}")
return filtered_ticks
def get_time_based_stats(self) -> Dict:
"""Get statistics about the ticks organized by time periods
Returns:
Dictionary with statistics for different time periods
"""
if not self.ticks:
return {
'total_ticks': 0,
'periods': {}
}
# Ensure ticks are updated
self._cleanup()
now = int(time.time() * 1000) # Current time in ms
# Define time periods to analyze
periods = {
'1min': now - (60 * 1000),
'5min': now - (5 * 60 * 1000),
'15min': now - (15 * 60 * 1000),
'30min': now - (30 * 60 * 1000)
}
stats = {
'total_ticks': len(self.ticks),
'oldest_tick': self.ticks[0]['timestamp'] if self.ticks else None,
'newest_tick': self.ticks[-1]['timestamp'] if self.ticks else None,
'time_span_seconds': (self.ticks[-1]['timestamp'] - self.ticks[0]['timestamp']) / 1000 if self.ticks else 0,
'periods': {}
}
# Calculate stats for each period
for period_name, cutoff_time in periods.items():
period_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff_time]
if period_ticks:
prices = [tick['price'] for tick in period_ticks]
volumes = [tick.get('volume', 0) for tick in period_ticks]
period_stats = {
'tick_count': len(period_ticks),
'min_price': min(prices) if prices else None,
'max_price': max(prices) if prices else None,
'avg_price': sum(prices) / len(prices) if prices else None,
'last_price': period_ticks[-1]['price'] if period_ticks else None,
'total_volume': sum(volumes),
'ticks_per_second': len(period_ticks) / (int(period_name[:-3]) * 60) if period_ticks else 0
}
stats['periods'][period_name] = period_stats
logger.debug(f"Generated time-based stats: {len(stats['periods'])} periods")
return stats
class CandlestickData:
def __init__(self, max_length: int = 300):
self.timestamps = deque(maxlen=max_length)
self.opens = deque(maxlen=max_length)
self.highs = deque(maxlen=max_length)
self.lows = deque(maxlen=max_length)
self.closes = deque(maxlen=max_length)
self.volumes = deque(maxlen=max_length)
self.current_candle = {
'timestamp': None,
'open': None,
'high': None,
'low': None,
'close': None,
'volume': 0
}
self.candle_interval = 1 # 1 second by default
def update_from_trade(self, trade: Dict):
timestamp = trade['timestamp']
price = trade['price']
volume = trade.get('volume', 0)
# Round timestamp to nearest candle interval
candle_timestamp = int(timestamp / (self.candle_interval * 1000)) * (self.candle_interval * 1000)
if self.current_candle['timestamp'] != candle_timestamp:
# Save current candle if it exists
if self.current_candle['timestamp'] is not None:
self.timestamps.append(self.current_candle['timestamp'])
self.opens.append(self.current_candle['open'])
self.highs.append(self.current_candle['high'])
self.lows.append(self.current_candle['low'])
self.closes.append(self.current_candle['close'])
self.volumes.append(self.current_candle['volume'])
logger.debug(f"New candle saved: {self.current_candle}")
# Start new candle
self.current_candle = {
'timestamp': candle_timestamp,
'open': price,
'high': price,
'low': price,
'close': price,
'volume': volume
}
logger.debug(f"New candle started: {self.current_candle}")
else:
# Update current candle
if self.current_candle['high'] is None or price > self.current_candle['high']:
self.current_candle['high'] = price
if self.current_candle['low'] is None or price < self.current_candle['low']:
self.current_candle['low'] = price
self.current_candle['close'] = price
self.current_candle['volume'] += volume
logger.debug(f"Updated current candle: {self.current_candle}")
def get_dataframe(self) -> pd.DataFrame:
# Include current candle in the dataframe if it exists
timestamps = list(self.timestamps)
opens = list(self.opens)
highs = list(self.highs)
lows = list(self.lows)
closes = list(self.closes)
volumes = list(self.volumes)
if self.current_candle['timestamp'] is not None:
timestamps.append(self.current_candle['timestamp'])
opens.append(self.current_candle['open'])
highs.append(self.current_candle['high'])
lows.append(self.current_candle['low'])
closes.append(self.current_candle['close'])
volumes.append(self.current_candle['volume'])
df = pd.DataFrame({
'timestamp': timestamps,
'open': opens,
'high': highs,
'low': lows,
'close': closes,
'volume': volumes
})
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
return df
class BinanceWebSocket:
"""Binance WebSocket implementation for real-time tick data"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').lower()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.message_count = 0
# Binance WebSocket configuration
self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade"
logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}")
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 100 == 0: # Log every 100th message to avoid spam
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
data = json.loads(message)
# Process trade data
if 'e' in data and data['e'] == 'trade':
trade_data = {
'timestamp': data['T'], # Trade time
'price': float(data['p']), # Price
'volume': float(data['q']), # Quantity
'type': 'trade'
}
logger.debug(f"Processed trade data: {trade_data}")
return trade_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.ws.close()
self.running = False
logger.info("WebSocket connection closed")
class BinanceHistoricalData:
"""Fetch historical candle data from Binance"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3/klines"
# Create a cache directory if it doesn't exist
self.cache_dir = os.path.join(os.getcwd(), "cache")
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}")
def _get_interval_string(self, interval_seconds: int) -> str:
"""Convert interval seconds to Binance interval string"""
if interval_seconds == 60: # 1m
return "1m"
elif interval_seconds == 300: # 5m
return "5m"
elif interval_seconds == 900: # 15m
return "15m"
elif interval_seconds == 1800: # 30m
return "30m"
elif interval_seconds == 3600: # 1h
return "1h"
elif interval_seconds == 14400: # 4h
return "4h"
elif interval_seconds == 86400: # 1d
return "1d"
else:
# Default to 1m if not recognized
logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m")
return "1m"
def _get_cache_filename(self, symbol: str, interval: str) -> str:
"""Generate cache filename for the symbol and interval"""
# Replace any slashes in symbol with underscore
safe_symbol = symbol.replace("/", "_")
return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]:
"""Load candle data from cache if available and not expired"""
filename = self._get_cache_filename(symbol, interval)
if not os.path.exists(filename):
logger.debug(f"No cache file found for {symbol} {interval}")
return None
# Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d)
file_age = time.time() - os.path.getmtime(filename)
max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others
if file_age > max_age:
logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)")
return None
try:
df = pd.read_csv(filename)
# Convert timestamp string back to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error loading from cache: {str(e)}")
return None
def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool:
"""Save candle data to cache"""
if df.empty:
logger.warning(f"No data to cache for {symbol} {interval}")
return False
filename = self._get_cache_filename(symbol, interval)
try:
df.to_csv(filename, index=False)
logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame:
"""Get historical candle data for the specified symbol and interval"""
# Convert to Binance format
clean_symbol = symbol.replace("/", "")
interval = self._get_interval_string(interval_seconds)
# Try to load from cache first
cached_data = self._load_from_cache(symbol, interval)
if cached_data is not None and len(cached_data) >= limit:
return cached_data.tail(limit)
# Fetch from API if not cached or insufficient
try:
logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API")
params = {
"symbol": clean_symbol,
"interval": interval,
"limit": limit
}
response = requests.get(self.base_url, params=params)
response.raise_for_status() # Raise exception for HTTP errors
# Process the data
candles = response.json()
if not candles:
logger.warning(f"No candles returned from Binance for {symbol} {interval}")
return pd.DataFrame()
# Convert to DataFrame - Binance returns data in this format:
# [
# [
# 1499040000000, // Open time
# "0.01634790", // Open
# "0.80000000", // High
# "0.01575800", // Low
# "0.01577100", // Close
# "148976.11427815", // Volume
# ... // Ignore the rest
# ],
# ...
# ]
df = pd.DataFrame(candles, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert types
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Keep only needed columns
df = df[["timestamp", "open", "high", "low", "close", "volume"]]
# Cache the results
self._save_to_cache(df, symbol, interval)
logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
class ExchangeWebSocket:
"""Generic WebSocket interface for cryptocurrency exchanges"""
def __init__(self, symbol: str, exchange: str = "binance"):
self.symbol = symbol
self.exchange = exchange.lower()
self.ws = None
# Initialize the appropriate WebSocket implementation
if self.exchange == "binance":
self.ws = BinanceWebSocket(symbol)
elif self.exchange == "mexc":
self.ws = MEXCWebSocket(symbol)
else:
raise ValueError(f"Unsupported exchange: {exchange}")
async def connect(self):
"""Connect to the exchange WebSocket"""
return await self.ws.connect()
async def receive(self) -> Optional[Dict]:
"""Receive data from the WebSocket"""
return await self.ws.receive()
async def close(self):
"""Close the WebSocket connection"""
await self.ws.close()
@property
def running(self):
"""Check if the WebSocket is running"""
return self.ws.running if self.ws else False
class CandleCache:
def __init__(self, max_candles: int = 5000):
self.candles = {
'1s': deque(maxlen=max_candles),
'1m': deque(maxlen=max_candles),
'1h': deque(maxlen=max_candles),
'1d': deque(maxlen=max_candles)
}
logger.info(f"Initialized CandleCache with max candles: {max_candles}")
def add_candles(self, interval: str, new_candles: pd.DataFrame):
if interval in self.candles and not new_candles.empty:
# Convert DataFrame to list of dicts to avoid pandas issues
for _, row in new_candles.iterrows():
candle_dict = row.to_dict()
self.candles[interval].append(candle_dict)
logger.debug(f"Added {len(new_candles)} candles to {interval} cache")
def get_recent_candles(self, interval: str, count: int = 500) -> pd.DataFrame:
if interval in self.candles and self.candles[interval]:
# Convert deque to list of dicts first
all_candles = list(self.candles[interval])
# Check if we're requesting more candles than we have
if count > len(all_candles):
logger.debug(f"Requested {count} candles, but only have {len(all_candles)} for {interval}")
count = len(all_candles)
recent_candles = all_candles[-count:]
logger.debug(f"Returning {len(recent_candles)} recent candles for {interval} (requested {count})")
# Create DataFrame and ensure timestamp is datetime type
df = pd.DataFrame(recent_candles)
if not df.empty and 'timestamp' in df.columns:
try:
if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
df['timestamp'] = pd.to_datetime(df['timestamp'])
except Exception as e:
logger.warning(f"Error converting timestamps in get_recent_candles: {str(e)}")
return df
logger.debug(f"No candles available for {interval}")
return pd.DataFrame()
def update_cache(self, interval: str, new_candles: pd.DataFrame):
"""
Update the candle cache for a specific timeframe with new candles
Args:
interval: The timeframe interval ('1s', '1m', '1h', '1d')
new_candles: DataFrame with new candles to add to the cache
"""
if interval not in self.candles:
logger.warning(f"Invalid interval {interval} for cache update")
return
if new_candles is None or new_candles.empty:
logger.debug(f"No new candles to update {interval} cache")
return
# Check if timestamp column exists
if 'timestamp' not in new_candles.columns:
logger.warning(f"No timestamp column in new candles for {interval}")
return
try:
# Ensure timestamp is datetime for proper comparison
try:
if not pd.api.types.is_datetime64_any_dtype(new_candles['timestamp']):
logger.debug(f"Converting timestamps to datetime for {interval}")
new_candles['timestamp'] = pd.to_datetime(new_candles['timestamp'])
except Exception as e:
logger.error(f"Error converting timestamps: {str(e)}")
# Try a different approach
try:
new_candles['timestamp'] = pd.to_datetime(new_candles['timestamp'], errors='coerce')
# Drop any rows where conversion failed
new_candles = new_candles.dropna(subset=['timestamp'])
if new_candles.empty:
logger.warning(f"All timestamps conversion failed for {interval}")
return
except Exception as e2:
logger.error(f"Second attempt to convert timestamps failed: {str(e2)}")
return
# Create a copy to avoid modifying the original
new_candles_copy = new_candles.copy()
# If we have no candles in cache, add all new candles
if not self.candles[interval]:
logger.debug(f"No existing candles for {interval}, adding all {len(new_candles_copy)} candles")
self.add_candles(interval, new_candles_copy)
return
# Get the timestamp from the last cached candle
last_cached_candle = self.candles[interval][-1]
if not isinstance(last_cached_candle, dict):
logger.warning(f"Last cached candle is not a dictionary for {interval}")
last_cached_candle = {'timestamp': None}
if 'timestamp' not in last_cached_candle:
logger.warning(f"No timestamp in last cached candle for {interval}")
last_cached_candle['timestamp'] = None
last_cached_time = last_cached_candle['timestamp']
logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}")
# If last_cached_time is None, add all candles
if last_cached_time is None:
logger.debug(f"No valid last cached timestamp, adding all {len(new_candles_copy)} candles for {interval}")
self.add_candles(interval, new_candles_copy)
return
# Convert last_cached_time to datetime if needed
if not isinstance(last_cached_time, (pd.Timestamp, datetime)):
try:
last_cached_time = pd.to_datetime(last_cached_time)
except Exception as e:
logger.error(f"Cannot convert last cached time to datetime: {str(e)}")
# Add all candles as fallback
self.add_candles(interval, new_candles_copy)
return
# Make a backup of current cache before filtering
cache_backup = list(self.candles[interval])
# Filter new candles that are after the last cached candle
try:
filtered_candles = new_candles_copy[new_candles_copy['timestamp'] > last_cached_time]
if not filtered_candles.empty:
logger.debug(f"Adding {len(filtered_candles)} new candles for {interval}")
self.add_candles(interval, filtered_candles)
else:
# No new candles after last cached time, check for missing candles
try:
# Get unique timestamps in cache
cached_timestamps = set()
for candle in self.candles[interval]:
if isinstance(candle, dict) and 'timestamp' in candle:
ts = candle['timestamp']
if isinstance(ts, (pd.Timestamp, datetime)):
cached_timestamps.add(ts)
else:
try:
cached_timestamps.add(pd.to_datetime(ts))
except:
pass
# Find candles in new_candles that aren't in the cache
missing_candles = new_candles_copy[~new_candles_copy['timestamp'].isin(cached_timestamps)]
if not missing_candles.empty:
logger.info(f"Found {len(missing_candles)} missing candles for {interval}")
self.add_candles(interval, missing_candles)
else:
logger.debug(f"No new or missing candles to add for {interval}")
except Exception as missing_error:
logger.error(f"Error checking for missing candles: {str(missing_error)}")
except Exception as filter_error:
logger.error(f"Error filtering candles by timestamp: {str(filter_error)}")
# Restore from backup
self.candles[interval] = deque(cache_backup, maxlen=self.candles[interval].maxlen)
# Try adding all candles as fallback
self.add_candles(interval, new_candles_copy)
except Exception as e:
logger.error(f"Unhandled error updating cache for {interval}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def get_candles(self, timeframe: str, count: int = 500) -> pd.DataFrame:
"""
Get candles for a specific timeframe. This is an alias for get_recent_candles
to maintain compatibility with code that expects this method name.
Args:
timeframe: The timeframe interval ('1s', '1m', '1h', '1d')
count: Maximum number of candles to return
Returns:
DataFrame containing the candles
"""
try:
logger.debug(f"Getting {count} candles for {timeframe} via get_candles()")
return self.get_recent_candles(timeframe, count)
except Exception as e:
logger.error(f"Error in get_candles for {timeframe}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
class RealTimeChart:
"""Real-time chart using Dash and Plotly"""
def __init__(self, symbol="BTC/USDT", use_sample_data=False, log_no_ticks_warning=True):
"""Initialize a new RealTimeChart
Args:
symbol: Trading pair symbol (e.g., BTC/USDT)
use_sample_data: Whether to use sample data when no real data is available
log_no_ticks_warning: Whether to log warnings when no ticks are available
"""
self.symbol = symbol
self.use_sample_data = use_sample_data
# Initialize variables for trading info display
self.current_signal = 'HOLD'
self.signal_time = datetime.now()
self.current_position = 0.0
self.session_balance = 100.0 # Start with $100 balance
self.session_pnl = 0.0
# Initialize NN signals and trades lists
self.nn_signals = []
self.trades = []
# Use existing timezone variable instead of trying to detect again
logger.info(f"Using timezone: {tz_name}")
# Initialize tick storage
logger.info(f"Initializing RealTimeChart for {symbol}")
self.tick_storage = TradeTickStorage(
symbol=symbol,
max_age_seconds=3600, # Keep ticks for 1 hour
use_sample_data=use_sample_data,
log_no_ticks_warning=log_no_ticks_warning
)
# Initialize candlestick data for backward compatibility
self.candlestick_data = CandlestickData(max_length=5000)
# Initialize candle cache
self.candle_cache = CandleCache(max_candles=5000)
# Initialize OHLCV cache dictionaries for different timeframes
self.ohlcv_cache = {
'1s': None,
'5s': None,
'15s': None,
'60s': None,
'300s': None,
'900s': None,
'3600s': None,
'1m': None,
'5m': None,
'15m': None,
'1h': None,
'1d': None
}
# Historical data handler
self.historical_data = BinanceHistoricalData()
# Flag for first render to force data loading
self.first_render = True
# Last time candles were saved to disk
self.last_cache_save_time = time.time()
# Initialize Dash app
self.app = dash.Dash(
__name__,
external_stylesheets=[dbc.themes.DARKLY],
suppress_callback_exceptions=True,
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}]
)
# Set up layout and callbacks
self._setup_app_layout()
def _setup_app_layout(self):
"""Set up the app layout and callbacks"""
# Define styling for interval buttons
button_style = {
'backgroundColor': '#2C2C2C',
'color': 'white',
'border': 'none',
'padding': '10px 15px',
'margin': '5px',
'borderRadius': '5px',
'cursor': 'pointer',
'fontWeight': 'bold'
}
active_button_style = {
**button_style,
'backgroundColor': '#4CAF50',
'boxShadow': '0 2px 4px rgba(0,0,0,0.5)'
}
# Create tab layout
self.app.layout = dbc.Tabs([
dbc.Tab(self._get_chart_layout(button_style, active_button_style), label="Chart", tab_id="chart-tab"),
# No longer need ticks tab as it's causing errors
], id="tabs")
# Set up callbacks
self._setup_interval_callback(button_style, active_button_style)
self._setup_chart_callback()
# We've removed the ticks callback, so don't call it
# self._setup_ticks_callback()
def _get_chart_layout(self, button_style, active_button_style):
# Chart page layout
return html.Div([
# Chart title and interval buttons
html.Div([
html.H2(f"{self.symbol} Real-Time Chart", style={
'textAlign': 'center',
'color': '#FFFFFF',
'marginBottom': '10px'
}),
# Store interval data
dcc.Store(id='interval-store', data={'interval': 1}),
# Interval selection buttons
html.Div([
html.Button('1s', id='btn-1s', n_clicks=0, style=active_button_style),
html.Button('5s', id='btn-5s', n_clicks=0, style=button_style),
html.Button('15s', id='btn-15s', n_clicks=0, style=button_style),
html.Button('30s', id='btn-30s', n_clicks=0, style=button_style),
html.Button('1m', id='btn-1m', n_clicks=0, style=button_style),
], style={
'display': 'flex',
'justifyContent': 'center',
'marginBottom': '15px'
}),
# Interval component for updates - set to refresh every 500ms
dcc.Interval(
id='interval-component',
interval=500, # Refresh every 500ms for better real-time updates
n_intervals=0
),
# Main chart
dcc.Graph(id='live-chart', style={'height': '75vh'}),
# Chart acknowledgment
html.Div("Real-time trading chart with ML signals", style={
'textAlign': 'center',
'color': '#AAAAAA',
'fontSize': '12px',
'marginTop': '5px'
})
])
])
def _get_ticks_layout(self):
# Ticks data page layout
return html.Div([
# Header and controls
html.Div([
html.H2(f"{self.symbol} Raw Tick Data (Last 5 Minutes)", style={
'textAlign': 'center',
'color': '#FFFFFF',
'margin': '10px 0'
}),
# Refresh button
html.Button('Refresh Data', id='refresh-ticks-btn', n_clicks=0, style={
'backgroundColor': '#4CAF50',
'color': 'white',
'padding': '10px 20px',
'margin': '10px auto',
'border': 'none',
'borderRadius': '5px',
'fontSize': '14px',
'cursor': 'pointer',
'display': 'block'
}),
# Time window selector
html.Div([
html.Label("Time Window:", style={'color': 'white', 'marginRight': '10px'}),
dcc.Dropdown(
id='time-window-dropdown',
options=[
{'label': 'Last 1 minute', 'value': 60},
{'label': 'Last 5 minutes', 'value': 300},
{'label': 'Last 15 minutes', 'value': 900},
{'label': 'Last 30 minutes', 'value': 1800},
],
value=300, # Default to 5 minutes
style={'width': '200px', 'backgroundColor': '#2C2C2C', 'color': 'black'}
)
], style={
'display': 'flex',
'alignItems': 'center',
'justifyContent': 'center',
'margin': '10px'
}),
], style={
'backgroundColor': '#2C2C2C',
'padding': '10px',
'borderRadius': '5px',
'marginBottom': '15px'
}),
# Stats cards
html.Div(id='tick-stats-cards', style={
'display': 'flex',
'flexWrap': 'wrap',
'justifyContent': 'space-around',
'marginBottom': '15px'
}),
# Ticks data table
html.Div(id='ticks-table-container', style={
'backgroundColor': '#232323',
'padding': '10px',
'borderRadius': '5px',
'overflowX': 'auto'
}),
# Price movement chart
html.Div([
html.H3("Price Movement", style={
'textAlign': 'center',
'color': '#FFFFFF',
'margin': '10px 0'
}),
dcc.Graph(id='tick-price-chart')
], style={
'backgroundColor': '#232323',
'padding': '10px',
'borderRadius': '5px',
'marginTop': '15px'
})
])
def _setup_interval_callback(self, button_style, active_button_style):
# Callback to update interval based on button clicks and update button styles
@self.app.callback(
[Output('interval-store', 'data'),
Output('btn-1s', 'style'),
Output('btn-5s', 'style'),
Output('btn-15s', 'style'),
Output('btn-30s', 'style'),
Output('btn-1m', 'style')],
[Input('btn-1s', 'n_clicks'),
Input('btn-5s', 'n_clicks'),
Input('btn-15s', 'n_clicks'),
Input('btn-30s', 'n_clicks'),
Input('btn-1m', 'n_clicks')],
[dash.dependencies.State('interval-store', 'data')]
)
def update_interval(n1, n5, n15, n30, n60, data):
ctx = dash.callback_context
if not ctx.triggered:
# Default state (1s selected)
return ({'interval': 1},
active_button_style, button_style, button_style, button_style, button_style)
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
if button_id == 'btn-1s':
return ({'interval': 1},
active_button_style, button_style, button_style, button_style, button_style)
elif button_id == 'btn-5s':
return ({'interval': 5},
button_style, active_button_style, button_style, button_style, button_style)
elif button_id == 'btn-15s':
return ({'interval': 15},
button_style, button_style, active_button_style, button_style, button_style)
elif button_id == 'btn-30s':
return ({'interval': 30},
button_style, button_style, button_style, active_button_style, button_style)
elif button_id == 'btn-1m':
return ({'interval': 60},
button_style, button_style, button_style, button_style, active_button_style)
# Default case - keep current interval and highlight appropriate button
current_interval = data.get('interval', 1)
styles = [button_style] * 5 # All inactive by default
# Set active style based on current interval
if current_interval == 1:
styles[0] = active_button_style
elif current_interval == 5:
styles[1] = active_button_style
elif current_interval == 15:
styles[2] = active_button_style
elif current_interval == 30:
styles[3] = active_button_style
elif current_interval == 60:
styles[4] = active_button_style
return (data, *styles)
def _setup_chart_callback(self):
# Callback to update the chart
@self.app.callback(
Output('live-chart', 'figure'),
[Input('interval-component', 'n_intervals'),
Input('interval-store', 'data')]
)
def update_chart(n, interval_data):
try:
interval = interval_data.get('interval', 1)
logger.debug(f"Updating chart with interval {interval}")
# Get candlesticks data
try:
df = self.tick_storage.get_candles(interval_seconds=interval)
if df.empty and self.ohlcv_cache[f'{interval}s' if interval < 60 else '1m'] is not None:
df = self.ohlcv_cache[f'{interval}s' if interval < 60 else '1m']
except Exception as e:
logger.error(f"Error getting candles: {str(e)}")
df = pd.DataFrame()
# Create basic layout
fig = make_subplots(
rows=2, cols=1,
vertical_spacing=0.08,
subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume'),
row_heights=[0.8, 0.2]
)
# Add candlestick chart
if not df.empty and 'open' in df.columns:
# Candlestick chart
fig.add_trace(
go.Candlestick(
x=df.index,
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
name='OHLC'
),
row=1, col=1
)
# Calculate y-axis range with padding
low_min = df['low'].min()
high_max = df['high'].max()
price_range = high_max - low_min
y_min = low_min - (price_range * 0.05) # 5% padding below
y_max = high_max + (price_range * 0.05) # 5% padding above
# Set y-axis range to ensure candles are visible
fig.update_yaxes(range=[y_min, y_max], row=1, col=1)
# Volume bars
colors = ['rgba(0,255,0,0.7)' if close >= open else 'rgba(255,0,0,0.7)'
for open, close in zip(df['open'], df['close'])]
fig.add_trace(
go.Bar(
x=df.index,
y=df['volume'],
name='Volume',
marker_color=colors
),
row=2, col=1
)
# Add trading info annotation if available
if hasattr(self, 'current_signal') and self.current_signal:
signal_color = "#33DD33" if self.current_signal == "BUY" else "#FF4444" if self.current_signal == "SELL" else "#BBBBBB"
# Format position value
position_text = f"{self.current_position:.4f}" if self.current_position < 0.01 else f"{self.current_position:.2f}"
# Create trading info text
info_text = (
f"Signal: {self.current_signal} | "
f"Position: {position_text} | "
f"Balance: ${self.session_balance:.2f} | "
f"PnL: {self.session_pnl:.4f}"
)
# Add annotation
fig.add_annotation(
x=0.5, y=1.05,
xref="paper", yref="paper",
text=info_text,
showarrow=False,
font=dict(size=14, color="white"),
bgcolor="rgba(50,50,50,0.6)",
borderwidth=1,
borderpad=6,
align="center"
)
# Update layout
fig.update_layout(
title_text=f"{self.symbol} Real-Time Data",
title_x=0.5,
xaxis_rangeslider_visible=False,
height=700,
template='plotly_dark',
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(25,25,50,1)'
)
# Update axes styling
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
return fig
except Exception as e:
logger.error(f"Error updating chart: {str(e)}")
fig = go.Figure()
fig.add_annotation(
x=0.5, y=0.5,
text=f"Error updating chart: {str(e)}",
showarrow=False,
font=dict(size=14, color="red"),
xref="paper", yref="paper"
)
return fig
def _interval_to_seconds(self, interval_key: str) -> int:
"""Convert interval key to seconds"""
mapping = {
'1s': 1,
'1m': 60,
'1h': 3600,
'1d': 86400
}
return mapping.get(interval_key, 1)
async def start_websocket(self):
ws = ExchangeWebSocket(self.symbol)
connection_attempts = 0
max_attempts = 10 # Maximum connection attempts before longer waiting period
while True: # Keep trying to maintain connection
connection_attempts += 1
if not await ws.connect():
logger.error(f"Failed to connect to exchange for {self.symbol}")
# Gradually increase wait time based on number of connection failures
wait_time = min(5 * connection_attempts, 60) # Cap at 60 seconds
logger.warning(f"Waiting {wait_time} seconds before retry (attempt {connection_attempts})")
if connection_attempts >= max_attempts:
logger.warning(f"Reached {max_attempts} connection attempts, taking a longer break")
await asyncio.sleep(120) # 2 minutes wait after max attempts
connection_attempts = 0 # Reset counter
else:
await asyncio.sleep(wait_time)
continue
# Successfully connected
connection_attempts = 0
try:
logger.info(f"WebSocket connected for {self.symbol}, beginning data collection")
tick_count = 0
last_tick_count_log = time.time()
last_status_report = time.time()
# Track stats for reporting
price_min = float('inf')
price_max = float('-inf')
price_last = None
volume_total = 0
start_collection_time = time.time()
while True:
if not ws.running:
logger.warning(f"WebSocket connection lost for {self.symbol}, breaking loop")
break
data = await ws.receive()
if data:
if data.get('type') == 'kline':
# Use kline data directly for candlestick
trade_data = {
'timestamp': data['timestamp'],
'price': data['price'],
'volume': data['volume'],
'open': data['open'],
'high': data['high'],
'low': data['low']
}
logger.debug(f"Received kline data: {data}")
else:
# Use trade data
trade_data = {
'timestamp': data['timestamp'],
'price': data['price'],
'volume': data['volume']
}
# Update stats
price = trade_data['price']
volume = trade_data['volume']
price_min = min(price_min, price)
price_max = max(price_max, price)
price_last = price
volume_total += volume
# Store raw tick in the tick storage
self.tick_storage.add_tick(trade_data)
tick_count += 1
# Also update the old candlestick data for backward compatibility
if hasattr(self, 'candlestick_data'):
self.candlestick_data.update_from_trade(trade_data)
# Log tick counts periodically
current_time = time.time()
if current_time - last_tick_count_log >= 10: # Log every 10 seconds
elapsed = current_time - last_tick_count_log
tps = tick_count / elapsed if elapsed > 0 else 0
logger.info(f"{self.symbol}: Collected {tick_count} ticks in last {elapsed:.1f}s ({tps:.2f} ticks/sec), total: {len(self.tick_storage.ticks)}")
last_tick_count_log = current_time
tick_count = 0
# Check if ticks are being converted to candles
if len(self.tick_storage.ticks) > 0:
sample_df = self.tick_storage.get_candles(interval_seconds=1)
logger.info(f"{self.symbol}: Sample candle count: {len(sample_df)}")
# Periodic status report (every 60 seconds)
if current_time - last_status_report >= 60:
elapsed_total = current_time - start_collection_time
logger.info(f"{self.symbol} Status Report:")
logger.info(f" Collection time: {elapsed_total:.1f} seconds")
logger.info(f" Price range: {price_min:.2f} - {price_max:.2f} (last: {price_last:.2f})")
logger.info(f" Total volume: {volume_total:.8f}")
logger.info(f" Active ticks in storage: {len(self.tick_storage.ticks)}")
# Reset stats for next period
last_status_report = current_time
price_min = float('inf') if price_last is None else price_last
price_max = float('-inf') if price_last is None else price_last
volume_total = 0
await asyncio.sleep(0.01)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket connection closed for {self.symbol}: {str(e)}")
except Exception as e:
logger.error(f"Error in WebSocket loop for {self.symbol}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
logger.info(f"Closing WebSocket connection for {self.symbol}")
await ws.close()
logger.info(f"Waiting 5 seconds before reconnecting {self.symbol} WebSocket...")
await asyncio.sleep(5)
def run(self, host='localhost', port=8050):
"""Run the Dash app
Args:
host: Hostname to run on
port: Port to run on
"""
logger.info(f"Starting Dash app on {host}:{port}")
# Ensure interval component is created
if not hasattr(self, 'app') or not self.app.layout:
logger.error("App layout not initialized properly")
return
# If interval-component is not in the layout, add it
if 'interval-component' not in str(self.app.layout):
logger.warning("Interval component not found in layout, adding it")
self.app.layout.children.append(
dcc.Interval(
id='interval-component',
interval=500, # 500ms for real-time updates
n_intervals=0
)
)
# Start websocket connection in a separate thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.websocket_thread = threading.Thread(target=lambda: asyncio.run(self.start_websocket()))
self.websocket_thread.daemon = True
self.websocket_thread.start()
# Ensure historical data is loaded before starting
self._load_historical_data()
try:
self.app.run(host=host, port=port, debug=False)
except Exception as e:
logger.error(f"Error running Dash app: {str(e)}")
finally:
# Ensure resources are cleaned up
self._save_candles_to_disk(force=True)
logger.info("Dash app stopped")
def _load_historical_data(self):
"""Load historical data for all timeframes from Binance API and local cache"""
try:
logger.info(f"Loading historical data for {self.symbol}...")
# Define intervals to fetch
intervals = {
'1s': 1,
'1m': 60,
'1h': 3600,
'1d': 86400
}
# Track load status
load_status = {interval: False for interval in intervals.keys()}
# First try to load from local cache files
logger.info("Step 1: Loading from local cache files...")
for interval_key, interval_seconds in intervals.items():
try:
cache_file = os.path.join(self.historical_data.cache_dir,
f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv")
logger.info(f"Checking for cached {interval_key} data at {cache_file}")
if os.path.exists(cache_file):
# Check if cache is fresh (less than 1 day old for anything but 1d, 3 days for 1d)
file_age = time.time() - os.path.getmtime(cache_file)
max_age = 259200 if interval_key == '1d' else 86400 # 3 days for 1d, 1 day for others
logger.info(f"Cache file age: {file_age:.1f}s, max allowed: {max_age}s")
if file_age <= max_age:
logger.info(f"Loading {interval_key} candles from cache")
cached_df = pd.read_csv(cache_file)
if not cached_df.empty:
# Diagnostic info about the loaded data
logger.info(f"Loaded {len(cached_df)} candles from {cache_file}")
logger.info(f"Columns: {cached_df.columns.tolist()}")
logger.info(f"First few rows: {cached_df.head(2).to_dict('records')}")
# Convert timestamp string back to datetime
if 'timestamp' in cached_df.columns:
try:
if not pd.api.types.is_datetime64_any_dtype(cached_df['timestamp']):
cached_df['timestamp'] = pd.to_datetime(cached_df['timestamp'])
logger.info("Successfully converted timestamps to datetime")
except Exception as e:
logger.warning(f"Could not convert timestamp column for {interval_key}: {str(e)}")
# Only keep the last 2000 candles for memory efficiency
if len(cached_df) > 2000:
cached_df = cached_df.tail(2000)
logger.info(f"Truncated to last 2000 candles")
# Add to cache
for _, row in cached_df.iterrows():
candle_dict = row.to_dict()
self.candle_cache.candles[interval_key].append(candle_dict)
# Update ohlcv_cache
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000)
logger.info(f"Successfully loaded {len(self.ohlcv_cache[interval_key])} cached {interval_key} candles")
if len(self.ohlcv_cache[interval_key]) >= 500:
load_status[interval_key] = True
# Skip fetching from API if we loaded from cache (except for 1d timeframe which we always refresh)
if interval_key != '1d':
continue
else:
logger.info(f"Cache file for {interval_key} is too old ({file_age:.1f}s)")
else:
logger.info(f"No cache file found for {interval_key}")
except Exception as e:
logger.error(f"Error loading cached {interval_key} candles: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# For timeframes other than 1s, fetch from API as backup or for fresh data
logger.info("Step 2: Fetching data from API for missing timeframes...")
for interval_key, interval_seconds in intervals.items():
# Skip 1s for API requests
if interval_key == '1s' or load_status[interval_key]:
logger.info(f"Skipping API fetch for {interval_key}: already loaded or 1s timeframe")
continue
# Fetch historical data from API
try:
logger.info(f"Fetching {interval_key} candles from API for {self.symbol}")
historical_df = self.historical_data.get_historical_candles(
symbol=self.symbol,
interval_seconds=interval_seconds,
limit=500 # Get 500 candles
)
if not historical_df.empty:
logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key} from API")
# If we already have data in cache, merge with new data to avoid duplicates
if self.ohlcv_cache[interval_key] is not None and not self.ohlcv_cache[interval_key].empty:
existing_df = self.ohlcv_cache[interval_key]
# Get the latest timestamp from existing data
latest_time = existing_df['timestamp'].max()
# Only keep newer records from API
new_candles = historical_df[historical_df['timestamp'] > latest_time]
if not new_candles.empty:
logger.info(f"Adding {len(new_candles)} new candles to existing {interval_key} cache")
# Add to cache
for _, row in new_candles.iterrows():
candle_dict = row.to_dict()
self.candle_cache.candles[interval_key].append(candle_dict)
else:
# No existing data, add all from API
for _, row in historical_df.iterrows():
candle_dict = row.to_dict()
self.candle_cache.candles[interval_key].append(candle_dict)
# Update ohlcv_cache with combined data
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000)
logger.info(f"Total {interval_key} candles in cache: {len(self.ohlcv_cache[interval_key])}")
if len(self.ohlcv_cache[interval_key]) >= 500:
load_status[interval_key] = True
else:
logger.warning(f"No historical data available from API for {self.symbol} {interval_key}")
except Exception as e:
logger.error(f"Error fetching {interval_key} data from API: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Log summary of loaded data
logger.info("Historical data load summary:")
for interval_key in intervals.keys():
count = len(self.ohlcv_cache[interval_key]) if self.ohlcv_cache[interval_key] is not None else 0
status = "Success" if load_status[interval_key] else "Failed"
if count > 0 and count < 500:
status = "Partial"
logger.info(f"{interval_key}: {count} candles - {status}")
except Exception as e:
logger.error(f"Error in _load_historical_data: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def _save_candles_to_disk(self, force=False):
"""Save current candle cache to disk for persistence between runs"""
try:
# Only save if we have data and sufficient time has passed (every 5 minutes)
current_time = time.time()
if not force and current_time - self.last_cache_save_time < 300: # 5 minutes
return
# Save each timeframe's candles to disk
for interval_key, candles in self.candle_cache.candles.items():
if candles:
# Convert to DataFrame
df = pd.DataFrame(list(candles))
if not df.empty:
# Ensure timestamp is properly formatted
if 'timestamp' in df.columns:
try:
if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
df['timestamp'] = pd.to_datetime(df['timestamp'])
except:
logger.warning(f"Could not convert timestamp column for {interval_key}")
# Save to disk in the cache directory
cache_file = os.path.join(self.historical_data.cache_dir,
f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv")
df.to_csv(cache_file, index=False)
logger.info(f"Saved {len(df)} {interval_key} candles to {cache_file}")
self.last_cache_save_time = current_time
logger.info(f"Saved all candle caches to disk at {datetime.now()}")
except Exception as e:
logger.error(f"Error saving candles to disk: {str(e)}")
import traceback
logger.error(traceback.format_exc())
def add_nn_signal(self, signal_type, timestamp, probability=None):
"""Add a neural network signal to be displayed on the chart
Args:
signal_type: The type of signal (BUY, SELL, HOLD)
timestamp: The timestamp for the signal
probability: Optional probability/confidence value
"""
if signal_type not in ['BUY', 'SELL', 'HOLD']:
logger.warning(f"Invalid NN signal type: {signal_type}")
return
# Convert timestamp to datetime if it's not already
if not isinstance(timestamp, datetime):
try:
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
elif isinstance(timestamp, (int, float)):
timestamp = datetime.fromtimestamp(timestamp / 1000.0)
except Exception as e:
logger.error(f"Error converting timestamp for NN signal: {str(e)}")
timestamp = datetime.now()
# Add the signal to our list
self.nn_signals.append({
'type': signal_type,
'timestamp': timestamp,
'probability': probability,
'added': datetime.now()
})
# Only keep the most recent 50 signals
if len(self.nn_signals) > 50:
self.nn_signals = self.nn_signals[-50:]
logger.info(f"Added NN signal: {signal_type} at {timestamp}")
def add_trade(self, price, timestamp, pnl=None, amount=0.1, action=None, type=None):
"""Add a trade to be displayed on the chart
Args:
price: The price at which the trade was executed
timestamp: The timestamp for the trade
pnl: Optional profit and loss value for the trade
amount: Amount traded
action: The type of trade (BUY or SELL) - alternative to type parameter
type: The type of trade (BUY or SELL) - alternative to action parameter
"""
# Handle both action and type parameters for backward compatibility
trade_type = type or action
# Default to BUY if trade_type is None or not specified
if trade_type is None:
logger.warning(f"Trade type not specified in add_trade call, defaulting to BUY. Price: {price}, Timestamp: {timestamp}")
trade_type = "BUY"
if isinstance(trade_type, int):
trade_type = "BUY" if trade_type == 0 else "SELL"
# Ensure trade_type is uppercase if it's a string
if isinstance(trade_type, str):
trade_type = trade_type.upper()
if trade_type not in ['BUY', 'SELL']:
logger.warning(f"Invalid trade type: {trade_type} (value type: {type(trade_type).__name__}), defaulting to BUY. Price: {price}, Timestamp: {timestamp}")
trade_type = "BUY"
# Convert timestamp to datetime if it's not already
if not isinstance(timestamp, datetime):
try:
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
elif isinstance(timestamp, (int, float)):
timestamp = datetime.fromtimestamp(timestamp / 1000.0)
except Exception as e:
logger.error(f"Error converting timestamp for trade: {str(e)}")
timestamp = datetime.now()
# Initialize trades list if it doesn't exist
if not hasattr(self, 'trades'):
self.trades = []
# Add the trade to our list
self.trades.append({
'type': trade_type,
'price': price,
'timestamp': timestamp,
'amount': amount,
'pnl': pnl,
'added': datetime.now()
})
# Only keep the most recent 50 trades
if len(self.trades) > 50:
self.trades = self.trades[-50:]
logger.info(f"Added trade: {trade_type} {amount} at price {price} at time {timestamp} with PnL: {pnl}")
def update_trading_info(self, signal=None, position=None, balance=None, pnl=None):
"""Update the current trading information to be displayed on the chart
Args:
signal: Current signal (BUY, SELL, HOLD)
position: Current position size
balance: Current session balance
pnl: Current session PnL
"""
if signal is not None:
if signal in ['BUY', 'SELL', 'HOLD']:
self.current_signal = signal
self.signal_time = datetime.now()
else:
logger.warning(f"Invalid signal type: {signal}")
if position is not None:
self.current_position = position
if balance is not None:
self.session_balance = balance
if pnl is not None:
self.session_pnl = pnl
logger.debug(f"Updated trading info: Signal={self.current_signal}, Position={self.current_position}, Balance=${self.session_balance:.2f}, PnL={self.session_pnl:.4f}")
async def main():
global charts # Make charts globally accessible for NN integration
symbols = ["ETH/USDT", "BTC/USDT"]
logger.info(f"Starting application for symbols: {symbols}")
# Initialize neural network if enabled
if NN_ENABLED:
logger.info("Initializing Neural Network integration...")
if setup_neural_network():
logger.info("Neural Network integration initialized successfully")
else:
logger.warning("Neural Network integration failed to initialize")
charts = []
websocket_tasks = []
# Create a chart and websocket task for each symbol
for symbol in symbols:
chart = RealTimeChart(symbol)
charts.append(chart)
websocket_tasks.append(asyncio.create_task(chart.start_websocket()))
# Run Dash in a separate thread to not block the event loop
server_threads = []
for i, chart in enumerate(charts):
port = 8050 + i # Use different ports for each chart
logger.info(f"Starting chart for {chart.symbol} on port {port}")
thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed
thread.daemon = True
thread.start()
server_threads.append(thread)
logger.info(f"Thread started for {chart.symbol} on port {port}")
try:
# Keep the main task running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
finally:
for task in websocket_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application terminated by user")