2866 lines
129 KiB
Python
2866 lines
129 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
|
|
from typing import Dict, List, Optional
|
|
import websockets
|
|
import plotly.graph_objects as go
|
|
from plotly.subplots import make_subplots
|
|
import dash
|
|
from dash import html, dcc
|
|
from dash.dependencies import Input, Output
|
|
import pandas as pd
|
|
import numpy as np
|
|
from collections import deque
|
|
import time
|
|
from threading import Thread
|
|
import requests
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
import pytz
|
|
import tzlocal
|
|
import threading
|
|
import random
|
|
import dash_bootstrap_components as dbc
|
|
|
|
# Configure logging with more detailed format
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Changed to DEBUG for more detailed logs
|
|
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler('realtime_chart.log')
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Neural Network integration (conditional import)
|
|
NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1'
|
|
nn_orchestrator = None
|
|
nn_inference_thread = None
|
|
|
|
if NN_ENABLED:
|
|
try:
|
|
import sys
|
|
# Add project root to sys.path if needed
|
|
project_root = os.path.dirname(os.path.abspath(__file__))
|
|
if project_root not in sys.path:
|
|
sys.path.append(project_root)
|
|
|
|
from NN.main import NeuralNetworkOrchestrator
|
|
logger.info("Neural Network module enabled")
|
|
except ImportError as e:
|
|
logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}")
|
|
NN_ENABLED = False
|
|
|
|
# NN utility functions
|
|
def setup_neural_network():
|
|
"""Initialize the neural network components if enabled"""
|
|
global nn_orchestrator, NN_ENABLED
|
|
|
|
if not NN_ENABLED:
|
|
return False
|
|
|
|
try:
|
|
# Get configuration from environment variables or use defaults
|
|
symbol = os.environ.get('NN_SYMBOL', 'BTC/USDT')
|
|
timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',')
|
|
output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL
|
|
|
|
# Configure the orchestrator
|
|
config = {
|
|
'symbol': symbol,
|
|
'timeframes': timeframes,
|
|
'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')),
|
|
'n_features': 5, # OHLCV
|
|
'output_size': output_size,
|
|
'model_dir': 'NN/models/saved',
|
|
'data_dir': 'NN/data'
|
|
}
|
|
|
|
# Initialize the orchestrator
|
|
logger.info(f"Initializing Neural Network Orchestrator with config: {config}")
|
|
nn_orchestrator = NeuralNetworkOrchestrator(config)
|
|
|
|
# Start inference thread if enabled
|
|
inference_interval = int(os.environ.get('NN_INFERENCE_INTERVAL', '60'))
|
|
if inference_interval > 0:
|
|
start_nn_inference_thread(inference_interval)
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error setting up neural network: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
NN_ENABLED = False
|
|
return False
|
|
|
|
def start_nn_inference_thread(interval_seconds):
|
|
"""Start a background thread to periodically run inference with the neural network"""
|
|
global nn_inference_thread
|
|
|
|
if not NN_ENABLED or nn_orchestrator is None:
|
|
logger.warning("Cannot start inference thread - Neural Network not enabled or initialized")
|
|
return False
|
|
|
|
def inference_worker():
|
|
"""Worker function for the inference thread"""
|
|
model_type = os.environ.get('NN_MODEL_TYPE', 'cnn')
|
|
timeframe = os.environ.get('NN_TIMEFRAME', '1h')
|
|
|
|
logger.info(f"Starting neural network inference thread with {interval_seconds}s interval")
|
|
logger.info(f"Using model type: {model_type}, timeframe: {timeframe}")
|
|
|
|
# Wait a bit for charts to initialize
|
|
time.sleep(5)
|
|
|
|
# Track active charts
|
|
active_charts = []
|
|
|
|
while True:
|
|
try:
|
|
# Find active charts if we don't have them yet
|
|
if not active_charts and 'charts' in globals():
|
|
active_charts = globals()['charts']
|
|
logger.info(f"Found {len(active_charts)} active charts for NN signals")
|
|
|
|
# Run inference
|
|
result = nn_orchestrator.run_inference_pipeline(
|
|
model_type=model_type,
|
|
timeframe=timeframe
|
|
)
|
|
|
|
if result:
|
|
# Log the result
|
|
logger.info(f"Neural network inference result: {result}")
|
|
|
|
# Add signal to charts
|
|
if active_charts:
|
|
try:
|
|
if 'action' in result:
|
|
action = result['action']
|
|
timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00'))
|
|
|
|
# Get probability if available
|
|
probability = None
|
|
if 'probability' in result:
|
|
probability = result['probability']
|
|
elif 'probabilities' in result:
|
|
probability = result['probabilities'].get(action, None)
|
|
|
|
# Add signal to each chart
|
|
for chart in active_charts:
|
|
if hasattr(chart, 'add_nn_signal'):
|
|
chart.add_nn_signal(action, timestamp, probability)
|
|
except Exception as e:
|
|
logger.error(f"Error adding NN signal to chart: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Sleep for the interval
|
|
time.sleep(interval_seconds)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in inference thread: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
time.sleep(5) # Wait a bit before retrying
|
|
|
|
# Create and start the thread
|
|
nn_inference_thread = threading.Thread(target=inference_worker, daemon=True)
|
|
nn_inference_thread.start()
|
|
|
|
return True
|
|
|
|
# Try to get local timezone, default to Sofia/EET if not available
|
|
try:
|
|
local_timezone = tzlocal.get_localzone()
|
|
# Get timezone name safely
|
|
try:
|
|
tz_name = str(local_timezone)
|
|
# Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone
|
|
if hasattr(local_timezone, 'zone'):
|
|
tz_name = local_timezone.zone
|
|
elif hasattr(local_timezone, 'key'):
|
|
tz_name = local_timezone.key
|
|
else:
|
|
tz_name = str(local_timezone)
|
|
except:
|
|
tz_name = "Local"
|
|
logger.info(f"Detected local timezone: {local_timezone} ({tz_name})")
|
|
except Exception as e:
|
|
logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET")
|
|
local_timezone = pytz.timezone('Europe/Sofia')
|
|
tz_name = "Europe/Sofia"
|
|
|
|
def convert_to_local_time(timestamp):
|
|
"""Convert timestamp to local timezone"""
|
|
try:
|
|
if isinstance(timestamp, pd.Timestamp):
|
|
dt = timestamp.to_pydatetime()
|
|
elif isinstance(timestamp, np.datetime64):
|
|
dt = pd.Timestamp(timestamp).to_pydatetime()
|
|
elif isinstance(timestamp, str):
|
|
dt = pd.to_datetime(timestamp).to_pydatetime()
|
|
else:
|
|
dt = timestamp
|
|
|
|
# If datetime is naive (no timezone), assume it's UTC
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=pytz.UTC)
|
|
|
|
# Convert to local timezone
|
|
local_dt = dt.astimezone(local_timezone)
|
|
return local_dt
|
|
except Exception as e:
|
|
logger.error(f"Error converting timestamp to local time: {str(e)}")
|
|
return timestamp
|
|
|
|
class TradeTickStorage:
|
|
"""Storage for trade ticks with a maximum age limit"""
|
|
|
|
def __init__(self, symbol: str = None, max_age_seconds: int = 3600, use_sample_data: bool = True, log_no_ticks_warning: bool = False): # 1 hour by default, up from 30 min
|
|
"""Initialize the tick storage
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
max_age_seconds: Maximum age for ticks to be stored
|
|
use_sample_data: If True, generate sample ticks when no real ticks available
|
|
log_no_ticks_warning: If True, log a warning when no ticks are available
|
|
"""
|
|
self.symbol = symbol
|
|
self.ticks = []
|
|
self.max_age_seconds = max_age_seconds
|
|
self.last_cleanup_time = time.time()
|
|
self.cleanup_interval = 60 # run cleanup every 60 seconds
|
|
self.cache_dir = "cache"
|
|
self.use_sample_data = use_sample_data
|
|
self.log_no_ticks_warning = log_no_ticks_warning
|
|
self.last_sample_price = 83000.0 # Starting price for sample data (for BTC)
|
|
self.last_sample_time = time.time() * 1000 # Starting time for sample data
|
|
self.last_tick_time = 0 # Initialize last_tick_time attribute
|
|
self.tick_count = 0 # Initialize tick_count attribute
|
|
|
|
# Create cache directory if it doesn't exist
|
|
if not os.path.exists(self.cache_dir):
|
|
os.makedirs(self.cache_dir)
|
|
|
|
# Try to load cached ticks
|
|
self._load_cached_ticks()
|
|
|
|
logger.info(f"Initialized TradeTickStorage for {symbol} with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds")
|
|
|
|
def add_tick(self, tick: Dict):
|
|
"""Add a tick to storage
|
|
|
|
Args:
|
|
tick: Tick data dict with fields:
|
|
price: The price
|
|
volume: The volume
|
|
timestamp: Timestamp in milliseconds
|
|
"""
|
|
if not tick:
|
|
return
|
|
|
|
# Check if we need to generate a timestamp
|
|
if 'timestamp' not in tick:
|
|
tick['timestamp'] = int(time.time() * 1000) # Current time in ms
|
|
|
|
# Ensure timestamp is an integer (milliseconds since epoch)
|
|
if not isinstance(tick['timestamp'], int):
|
|
try:
|
|
# Try to convert from float or string
|
|
tick['timestamp'] = int(float(tick['timestamp']))
|
|
except (ValueError, TypeError):
|
|
# If conversion fails, use current time
|
|
tick['timestamp'] = int(time.time() * 1000)
|
|
|
|
# Set default volume if not present
|
|
if 'volume' not in tick:
|
|
tick['volume'] = 0.01 # Default small volume
|
|
|
|
# Add tick to storage with a copy to avoid mutation
|
|
self.ticks.append(tick.copy())
|
|
|
|
# Keep track of latest tick for stats
|
|
self.last_tick_time = max(self.last_tick_time, tick['timestamp'])
|
|
|
|
# Cache every 100 ticks to avoid data loss
|
|
self.tick_count += 1
|
|
if self.tick_count % 100 == 0:
|
|
self._cache_ticks()
|
|
|
|
# Periodically clean up old ticks
|
|
if self.tick_count % 1000 == 0:
|
|
self._cleanup()
|
|
|
|
def _cleanup(self):
|
|
"""Remove ticks older than max_age_seconds"""
|
|
# Get current time in milliseconds
|
|
now = int(time.time() * 1000)
|
|
|
|
# Remove old ticks
|
|
cutoff = now - (self.max_age_seconds * 1000)
|
|
original_count = len(self.ticks)
|
|
self.ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff]
|
|
removed = original_count - len(self.ticks)
|
|
|
|
if removed > 0:
|
|
logger.debug(f"Cleaned up {removed} old ticks, remaining: {len(self.ticks)}")
|
|
|
|
def _load_cached_ticks(self):
|
|
"""Load cached ticks from disk on startup"""
|
|
# Create symbol-specific filename
|
|
symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower()
|
|
cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv")
|
|
|
|
if not os.path.exists(cache_file):
|
|
logger.info(f"No cached ticks found for {self.symbol}")
|
|
return
|
|
|
|
try:
|
|
# Check if cache is fresh (less than 10 minutes old)
|
|
file_age = time.time() - os.path.getmtime(cache_file)
|
|
if file_age > 600: # 10 minutes
|
|
logger.info(f"Cached ticks for {self.symbol} are too old ({file_age:.1f}s), skipping")
|
|
return
|
|
|
|
# Load cached ticks
|
|
tick_df = pd.read_csv(cache_file)
|
|
if tick_df.empty:
|
|
logger.info(f"Cached ticks file for {self.symbol} is empty")
|
|
return
|
|
|
|
# Convert to list of dicts and add to storage
|
|
cached_ticks = tick_df.to_dict('records')
|
|
self.ticks.extend(cached_ticks)
|
|
logger.info(f"Loaded {len(cached_ticks)} cached ticks for {self.symbol} from {cache_file}")
|
|
except Exception as e:
|
|
logger.error(f"Error loading cached ticks for {self.symbol}: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
def _cache_ticks(self):
|
|
"""Cache recent ticks to disk"""
|
|
if not self.ticks:
|
|
return
|
|
|
|
# Get ticks from last 10 minutes
|
|
now = int(time.time() * 1000) # Current time in ms
|
|
cutoff = now - (600 * 1000) # 10 minutes in ms
|
|
recent_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff]
|
|
|
|
if not recent_ticks:
|
|
logger.debug("No recent ticks to cache")
|
|
return
|
|
|
|
# Create symbol-specific filename
|
|
symbol_safe = self.symbol.replace("/", "_").replace("-", "_").lower()
|
|
cache_file = os.path.join(self.cache_dir, f"{symbol_safe}_recent_ticks.csv")
|
|
|
|
# Save to disk
|
|
try:
|
|
tick_df = pd.DataFrame(recent_ticks)
|
|
tick_df.to_csv(cache_file, index=False)
|
|
logger.info(f"Cached {len(recent_ticks)} recent ticks for {self.symbol} to {cache_file}")
|
|
except Exception as e:
|
|
logger.error(f"Error caching ticks: {str(e)}")
|
|
|
|
def get_latest_price(self) -> Optional[float]:
|
|
"""Get the latest price from the most recent tick"""
|
|
if self.ticks:
|
|
return self.ticks[-1].get('price')
|
|
return None
|
|
|
|
def get_price_stats(self) -> Dict:
|
|
"""Get stats about the prices in storage"""
|
|
if not self.ticks:
|
|
return {
|
|
'min': None,
|
|
'max': None,
|
|
'latest': None,
|
|
'count': 0,
|
|
'age_seconds': 0
|
|
}
|
|
|
|
prices = [tick['price'] for tick in self.ticks]
|
|
latest_timestamp = self.ticks[-1]['timestamp']
|
|
oldest_timestamp = self.ticks[0]['timestamp']
|
|
|
|
return {
|
|
'min': min(prices),
|
|
'max': max(prices),
|
|
'latest': prices[-1],
|
|
'count': len(prices),
|
|
'age_seconds': (latest_timestamp - oldest_timestamp) / 1000
|
|
}
|
|
|
|
def get_ticks_as_df(self) -> pd.DataFrame:
|
|
"""Return ticks as a DataFrame"""
|
|
if not self.ticks:
|
|
logger.warning("No ticks available for DataFrame conversion")
|
|
return pd.DataFrame()
|
|
|
|
# Ensure we have fresh data
|
|
self._cleanup()
|
|
|
|
# Create a new list from ticks to avoid modifying the original data
|
|
ticks_data = self.ticks.copy()
|
|
|
|
# Ensure we have the latest ticks at the end of the DataFrame
|
|
ticks_data.sort(key=lambda x: x['timestamp'])
|
|
|
|
df = pd.DataFrame(ticks_data)
|
|
if not df.empty:
|
|
logger.debug(f"Converting timestamps for {len(df)} ticks")
|
|
# Ensure timestamp column exists
|
|
if 'timestamp' not in df.columns:
|
|
logger.error("Tick data missing timestamp column")
|
|
return pd.DataFrame()
|
|
|
|
# Check timestamp datatype before conversion
|
|
sample_ts = df['timestamp'].iloc[0] if len(df) > 0 else None
|
|
logger.debug(f"Sample timestamp before conversion: {sample_ts}, type: {type(sample_ts)}")
|
|
|
|
# Convert timestamps to datetime
|
|
try:
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
logger.debug(f"Timestamps converted to datetime successfully")
|
|
if len(df) > 0:
|
|
logger.debug(f"Sample converted timestamp: {df['timestamp'].iloc[0]}")
|
|
except Exception as e:
|
|
logger.error(f"Error converting timestamps: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
return df
|
|
|
|
def get_candles(self, interval_seconds: int = 1, start_time_ms: int = None, end_time_ms: int = None) -> pd.DataFrame:
|
|
"""Generate candlestick data from ticks
|
|
|
|
Args:
|
|
interval_seconds: Interval in seconds for each candle
|
|
start_time_ms: Start time in milliseconds
|
|
end_time_ms: End time in milliseconds
|
|
|
|
Returns:
|
|
DataFrame with candlestick data
|
|
"""
|
|
# Get filtered ticks
|
|
ticks = self.get_ticks_from_time(start_time_ms, end_time_ms)
|
|
|
|
if not ticks:
|
|
if self.use_sample_data:
|
|
# Generate multiple sample ticks to create several candles
|
|
current_time = int(time.time() * 1000)
|
|
sample_ticks = []
|
|
|
|
# Generate ticks for the past 10 intervals
|
|
for i in range(20):
|
|
# Base price with some trend
|
|
base_price = self.last_sample_price * (1 + 0.0001 * (10 - i))
|
|
|
|
# Add some randomness to the price
|
|
random_factor = random.uniform(-0.002, 0.002) # Small random change
|
|
tick_price = base_price * (1 + random_factor)
|
|
|
|
# Create timestamp with appropriate offset
|
|
tick_time = current_time - (i * interval_seconds * 1000 // 2)
|
|
|
|
sample_tick = {
|
|
'price': tick_price,
|
|
'volume': random.uniform(0.01, 0.5),
|
|
'timestamp': tick_time,
|
|
'is_sample': True
|
|
}
|
|
|
|
sample_ticks.append(sample_tick)
|
|
|
|
# Update the last sample values
|
|
self.last_sample_price = sample_ticks[0]['price']
|
|
self.last_sample_time = sample_ticks[0]['timestamp']
|
|
|
|
# Add the sample ticks in chronological order
|
|
for tick in sorted(sample_ticks, key=lambda x: x['timestamp']):
|
|
self.add_tick(tick)
|
|
|
|
# Try again with the new ticks
|
|
ticks = self.get_ticks_from_time(start_time_ms, end_time_ms)
|
|
|
|
if not ticks and self.log_no_ticks_warning:
|
|
logger.warning("Still no ticks available after adding sample data")
|
|
elif self.log_no_ticks_warning:
|
|
logger.warning("No ticks available for candle formation")
|
|
return pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
|
else:
|
|
return pd.DataFrame(columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
|
|
|
|
# Ensure ticks are up to date
|
|
try:
|
|
self._cleanup()
|
|
except Exception as cleanup_error:
|
|
logger.error(f"Error cleaning up ticks: {str(cleanup_error)}")
|
|
|
|
df = pd.DataFrame(ticks)
|
|
if df.empty:
|
|
logger.warning("Tick DataFrame is empty after filtering/conversion")
|
|
return pd.DataFrame()
|
|
|
|
logger.info(f"Preparing to create candles from {len(df)} ticks with {interval_seconds}s interval")
|
|
|
|
# First, ensure all required columns exist
|
|
required_columns = ['timestamp', 'price', 'volume']
|
|
for col in required_columns:
|
|
if col not in df.columns:
|
|
logger.error(f"Required column '{col}' missing from tick data")
|
|
return pd.DataFrame()
|
|
|
|
# Make sure DataFrame has no duplicated timestamps before setting index
|
|
try:
|
|
if 'timestamp' in df.columns:
|
|
# Check for duplicate timestamps
|
|
duplicate_count = df['timestamp'].duplicated().sum()
|
|
if duplicate_count > 0:
|
|
logger.warning(f"Found {duplicate_count} duplicate timestamps, keeping the last occurrence")
|
|
# Keep the last occurrence of each timestamp
|
|
df = df.drop_duplicates(subset='timestamp', keep='last')
|
|
|
|
# Convert timestamp to datetime if it's not already
|
|
if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
|
|
logger.debug("Converting timestamp to datetime")
|
|
# Try multiple approaches to convert timestamps
|
|
try:
|
|
# First, try to convert from milliseconds (integer timestamps)
|
|
if pd.api.types.is_integer_dtype(df['timestamp']):
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
else:
|
|
# Otherwise try standard conversion
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
|
except Exception as conv_error:
|
|
logger.error(f"Error converting timestamps to datetime: {str(conv_error)}")
|
|
# Try a fallback approach
|
|
try:
|
|
# Fallback for integer timestamps
|
|
if df['timestamp'].iloc[0] > 1000000000000: # Check if milliseconds timestamp
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
else: # Otherwise assume seconds
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
|
|
except Exception as fallback_error:
|
|
logger.error(f"Fallback timestamp conversion failed: {str(fallback_error)}")
|
|
return pd.DataFrame()
|
|
|
|
# Use timestamp column for resampling
|
|
df = df.set_index('timestamp')
|
|
except Exception as prep_error:
|
|
logger.error(f"Error preprocessing DataFrame for resampling: {str(prep_error)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
|
|
# Create interval string for resampling - use 's' instead of deprecated 'S'
|
|
interval_str = f'{interval_seconds}s'
|
|
|
|
# Resample to create OHLCV candles with multiple fallback options
|
|
logger.debug(f"Resampling with interval: {interval_str}")
|
|
|
|
candles = None
|
|
|
|
# First attempt - individual column resampling
|
|
try:
|
|
# Check that price column exists and has enough data
|
|
if 'price' not in df.columns:
|
|
raise ValueError("Price column missing from DataFrame")
|
|
|
|
if len(df) < 2:
|
|
logger.warning("Not enough data points for resampling, using direct data")
|
|
# For single data point, create a single candle
|
|
if len(df) == 1:
|
|
price_val = df['price'].iloc[0]
|
|
volume_val = df['volume'].iloc[0] if 'volume' in df.columns else 0
|
|
timestamp_val = df.index[0]
|
|
|
|
candles = pd.DataFrame({
|
|
'timestamp': [timestamp_val],
|
|
'open': [price_val],
|
|
'high': [price_val],
|
|
'low': [price_val],
|
|
'close': [price_val],
|
|
'volume': [volume_val]
|
|
})
|
|
return candles
|
|
else:
|
|
# No data
|
|
return pd.DataFrame()
|
|
|
|
# Resample and aggregate each column separately
|
|
open_df = df['price'].resample(interval_str).first()
|
|
high_df = df['price'].resample(interval_str).max()
|
|
low_df = df['price'].resample(interval_str).min()
|
|
close_df = df['price'].resample(interval_str).last()
|
|
volume_df = df['volume'].resample(interval_str).sum()
|
|
|
|
# Check for length mismatches before combining
|
|
expected_length = len(open_df)
|
|
if (len(high_df) != expected_length or
|
|
len(low_df) != expected_length or
|
|
len(close_df) != expected_length or
|
|
len(volume_df) != expected_length):
|
|
logger.warning("Length mismatch in resampled columns, falling back to alternative method")
|
|
raise ValueError("Length mismatch")
|
|
|
|
# Combine into a single DataFrame
|
|
candles = pd.DataFrame({
|
|
'open': open_df,
|
|
'high': high_df,
|
|
'low': low_df,
|
|
'close': close_df,
|
|
'volume': volume_df
|
|
})
|
|
logger.debug(f"Successfully created {len(candles)} candles with individual column resampling")
|
|
except Exception as resample_error:
|
|
logger.error(f"Error in individual column resampling: {str(resample_error)}")
|
|
|
|
# Second attempt - built-in agg method
|
|
try:
|
|
logger.debug("Trying fallback resampling method with agg()")
|
|
candles = df.resample(interval_str).agg({
|
|
'price': ['first', 'max', 'min', 'last'],
|
|
'volume': 'sum'
|
|
})
|
|
# Flatten MultiIndex columns
|
|
candles.columns = ['open', 'high', 'low', 'close', 'volume']
|
|
logger.debug(f"Successfully created {len(candles)} candles with agg() method")
|
|
except Exception as agg_error:
|
|
logger.error(f"Error in agg() resampling: {str(agg_error)}")
|
|
|
|
# Third attempt - manual candle construction
|
|
try:
|
|
logger.debug("Trying manual candle construction method")
|
|
resampler = df.resample(interval_str)
|
|
candle_data = []
|
|
|
|
for name, group in resampler:
|
|
if not group.empty:
|
|
candle = {
|
|
'timestamp': name,
|
|
'open': group['price'].iloc[0],
|
|
'high': group['price'].max(),
|
|
'low': group['price'].min(),
|
|
'close': group['price'].iloc[-1],
|
|
'volume': group['volume'].sum() if 'volume' in group.columns else 0
|
|
}
|
|
candle_data.append(candle)
|
|
|
|
if candle_data:
|
|
candles = pd.DataFrame(candle_data)
|
|
logger.debug(f"Successfully created {len(candles)} candles with manual method")
|
|
else:
|
|
logger.warning("No candles created with manual method")
|
|
return pd.DataFrame()
|
|
except Exception as manual_error:
|
|
logger.error(f"Error in manual candle construction: {str(manual_error)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
|
|
# Ensure the result isn't empty
|
|
if candles is None or candles.empty:
|
|
logger.warning("No candles were created after all resampling attempts")
|
|
return pd.DataFrame()
|
|
|
|
# Reset index to get timestamp as column
|
|
try:
|
|
candles = candles.reset_index()
|
|
except Exception as reset_error:
|
|
logger.error(f"Error resetting index: {str(reset_error)}")
|
|
# Try to create a new DataFrame with the timestamp index as a column
|
|
try:
|
|
timestamp_col = candles.index.to_list()
|
|
candles_dict = candles.to_dict('list')
|
|
candles_dict['timestamp'] = timestamp_col
|
|
candles = pd.DataFrame(candles_dict)
|
|
except Exception as fallback_error:
|
|
logger.error(f"Error in fallback index reset: {str(fallback_error)}")
|
|
return pd.DataFrame()
|
|
|
|
# Ensure no NaN values
|
|
try:
|
|
nan_count_before = candles.isna().sum().sum()
|
|
if nan_count_before > 0:
|
|
logger.warning(f"Found {nan_count_before} NaN values in candles, dropping them")
|
|
|
|
candles = candles.dropna()
|
|
except Exception as nan_error:
|
|
logger.error(f"Error handling NaN values: {str(nan_error)}")
|
|
# Try to fill NaN values instead of dropping
|
|
try:
|
|
candles = candles.fillna(method='ffill').fillna(method='bfill')
|
|
except:
|
|
pass
|
|
|
|
logger.debug(f"Generated {len(candles)} candles from {len(df)} ticks")
|
|
return candles
|
|
|
|
def get_candle_stats(self) -> Dict:
|
|
"""Get statistics about cached candles for different intervals"""
|
|
stats = {}
|
|
|
|
# Define intervals to check
|
|
intervals = [1, 5, 15, 60, 300, 900, 3600]
|
|
|
|
for interval in intervals:
|
|
candles = self.get_candles(interval_seconds=interval)
|
|
count = len(candles) if not candles.empty else 0
|
|
|
|
# Get time range if we have candles
|
|
time_range = None
|
|
if count > 0:
|
|
try:
|
|
start_time = candles['timestamp'].min()
|
|
end_time = candles['timestamp'].max()
|
|
if isinstance(start_time, pd.Timestamp):
|
|
start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
if isinstance(end_time, pd.Timestamp):
|
|
end_time = end_time.strftime('%Y-%m-%d %H:%M:%S')
|
|
time_range = f"{start_time} to {end_time}"
|
|
except:
|
|
time_range = "Unknown"
|
|
|
|
stats[f"{interval}s"] = {
|
|
'count': count,
|
|
'time_range': time_range
|
|
}
|
|
|
|
return stats
|
|
|
|
def get_ticks_from_time(self, start_time_ms: int = None, end_time_ms: int = None) -> List[Dict]:
|
|
"""Get ticks within a specific time range
|
|
|
|
Args:
|
|
start_time_ms: Start time in milliseconds (None for no lower bound)
|
|
end_time_ms: End time in milliseconds (None for no upper bound)
|
|
|
|
Returns:
|
|
List of ticks within the time range
|
|
"""
|
|
if not self.ticks:
|
|
return []
|
|
|
|
# Ensure ticks are updated
|
|
self._cleanup()
|
|
|
|
# Apply time filters if specified
|
|
filtered_ticks = self.ticks
|
|
if start_time_ms is not None:
|
|
filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] >= start_time_ms]
|
|
if end_time_ms is not None:
|
|
filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] <= end_time_ms]
|
|
|
|
logger.debug(f"Retrieved {len(filtered_ticks)} ticks from time range {start_time_ms} to {end_time_ms}")
|
|
return filtered_ticks
|
|
|
|
def get_time_based_stats(self) -> Dict:
|
|
"""Get statistics about the ticks organized by time periods
|
|
|
|
Returns:
|
|
Dictionary with statistics for different time periods
|
|
"""
|
|
if not self.ticks:
|
|
return {
|
|
'total_ticks': 0,
|
|
'periods': {}
|
|
}
|
|
|
|
# Ensure ticks are updated
|
|
self._cleanup()
|
|
|
|
now = int(time.time() * 1000) # Current time in ms
|
|
|
|
# Define time periods to analyze
|
|
periods = {
|
|
'1min': now - (60 * 1000),
|
|
'5min': now - (5 * 60 * 1000),
|
|
'15min': now - (15 * 60 * 1000),
|
|
'30min': now - (30 * 60 * 1000)
|
|
}
|
|
|
|
stats = {
|
|
'total_ticks': len(self.ticks),
|
|
'oldest_tick': self.ticks[0]['timestamp'] if self.ticks else None,
|
|
'newest_tick': self.ticks[-1]['timestamp'] if self.ticks else None,
|
|
'time_span_seconds': (self.ticks[-1]['timestamp'] - self.ticks[0]['timestamp']) / 1000 if self.ticks else 0,
|
|
'periods': {}
|
|
}
|
|
|
|
# Calculate stats for each period
|
|
for period_name, cutoff_time in periods.items():
|
|
period_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff_time]
|
|
|
|
if period_ticks:
|
|
prices = [tick['price'] for tick in period_ticks]
|
|
volumes = [tick.get('volume', 0) for tick in period_ticks]
|
|
|
|
period_stats = {
|
|
'tick_count': len(period_ticks),
|
|
'min_price': min(prices) if prices else None,
|
|
'max_price': max(prices) if prices else None,
|
|
'avg_price': sum(prices) / len(prices) if prices else None,
|
|
'last_price': period_ticks[-1]['price'] if period_ticks else None,
|
|
'total_volume': sum(volumes),
|
|
'ticks_per_second': len(period_ticks) / (int(period_name[:-3]) * 60) if period_ticks else 0
|
|
}
|
|
|
|
stats['periods'][period_name] = period_stats
|
|
|
|
logger.debug(f"Generated time-based stats: {len(stats['periods'])} periods")
|
|
return stats
|
|
|
|
class CandlestickData:
|
|
def __init__(self, max_length: int = 300):
|
|
self.timestamps = deque(maxlen=max_length)
|
|
self.opens = deque(maxlen=max_length)
|
|
self.highs = deque(maxlen=max_length)
|
|
self.lows = deque(maxlen=max_length)
|
|
self.closes = deque(maxlen=max_length)
|
|
self.volumes = deque(maxlen=max_length)
|
|
self.current_candle = {
|
|
'timestamp': None,
|
|
'open': None,
|
|
'high': None,
|
|
'low': None,
|
|
'close': None,
|
|
'volume': 0
|
|
}
|
|
self.candle_interval = 1 # 1 second by default
|
|
|
|
def update_from_trade(self, trade: Dict):
|
|
timestamp = trade['timestamp']
|
|
price = trade['price']
|
|
volume = trade.get('volume', 0)
|
|
|
|
# Round timestamp to nearest candle interval
|
|
candle_timestamp = int(timestamp / (self.candle_interval * 1000)) * (self.candle_interval * 1000)
|
|
|
|
if self.current_candle['timestamp'] != candle_timestamp:
|
|
# Save current candle if it exists
|
|
if self.current_candle['timestamp'] is not None:
|
|
self.timestamps.append(self.current_candle['timestamp'])
|
|
self.opens.append(self.current_candle['open'])
|
|
self.highs.append(self.current_candle['high'])
|
|
self.lows.append(self.current_candle['low'])
|
|
self.closes.append(self.current_candle['close'])
|
|
self.volumes.append(self.current_candle['volume'])
|
|
logger.debug(f"New candle saved: {self.current_candle}")
|
|
|
|
# Start new candle
|
|
self.current_candle = {
|
|
'timestamp': candle_timestamp,
|
|
'open': price,
|
|
'high': price,
|
|
'low': price,
|
|
'close': price,
|
|
'volume': volume
|
|
}
|
|
logger.debug(f"New candle started: {self.current_candle}")
|
|
else:
|
|
# Update current candle
|
|
if self.current_candle['high'] is None or price > self.current_candle['high']:
|
|
self.current_candle['high'] = price
|
|
if self.current_candle['low'] is None or price < self.current_candle['low']:
|
|
self.current_candle['low'] = price
|
|
self.current_candle['close'] = price
|
|
self.current_candle['volume'] += volume
|
|
logger.debug(f"Updated current candle: {self.current_candle}")
|
|
|
|
def get_dataframe(self) -> pd.DataFrame:
|
|
# Include current candle in the dataframe if it exists
|
|
timestamps = list(self.timestamps)
|
|
opens = list(self.opens)
|
|
highs = list(self.highs)
|
|
lows = list(self.lows)
|
|
closes = list(self.closes)
|
|
volumes = list(self.volumes)
|
|
|
|
if self.current_candle['timestamp'] is not None:
|
|
timestamps.append(self.current_candle['timestamp'])
|
|
opens.append(self.current_candle['open'])
|
|
highs.append(self.current_candle['high'])
|
|
lows.append(self.current_candle['low'])
|
|
closes.append(self.current_candle['close'])
|
|
volumes.append(self.current_candle['volume'])
|
|
|
|
df = pd.DataFrame({
|
|
'timestamp': timestamps,
|
|
'open': opens,
|
|
'high': highs,
|
|
'low': lows,
|
|
'close': closes,
|
|
'volume': volumes
|
|
})
|
|
if not df.empty:
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
return df
|
|
|
|
class BinanceWebSocket:
|
|
"""Binance WebSocket implementation for real-time tick data"""
|
|
def __init__(self, symbol: str):
|
|
self.symbol = symbol.replace('/', '').lower()
|
|
self.ws = None
|
|
self.running = False
|
|
self.reconnect_delay = 1
|
|
self.max_reconnect_delay = 60
|
|
self.message_count = 0
|
|
|
|
# Binance WebSocket configuration
|
|
self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade"
|
|
logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}")
|
|
|
|
async def connect(self):
|
|
while True:
|
|
try:
|
|
logger.info(f"Attempting to connect to {self.ws_url}")
|
|
self.ws = await websockets.connect(self.ws_url)
|
|
logger.info("WebSocket connection established")
|
|
|
|
self.running = True
|
|
self.reconnect_delay = 1
|
|
logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"WebSocket connection error: {str(e)}")
|
|
await asyncio.sleep(self.reconnect_delay)
|
|
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
|
|
continue
|
|
|
|
async def receive(self) -> Optional[Dict]:
|
|
if not self.ws:
|
|
return None
|
|
|
|
try:
|
|
message = await self.ws.recv()
|
|
self.message_count += 1
|
|
|
|
if self.message_count % 100 == 0: # Log every 100th message to avoid spam
|
|
logger.info(f"Received message #{self.message_count}")
|
|
logger.debug(f"Raw message: {message[:200]}...")
|
|
|
|
data = json.loads(message)
|
|
|
|
# Process trade data
|
|
if 'e' in data and data['e'] == 'trade':
|
|
trade_data = {
|
|
'timestamp': data['T'], # Trade time
|
|
'price': float(data['p']), # Price
|
|
'volume': float(data['q']), # Quantity
|
|
'type': 'trade'
|
|
}
|
|
logger.debug(f"Processed trade data: {trade_data}")
|
|
return trade_data
|
|
|
|
return None
|
|
except websockets.exceptions.ConnectionClosed:
|
|
logger.warning("WebSocket connection closed")
|
|
self.running = False
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error receiving message: {str(e)}")
|
|
return None
|
|
|
|
async def close(self):
|
|
"""Close the WebSocket connection"""
|
|
if self.ws:
|
|
await self.ws.close()
|
|
self.running = False
|
|
logger.info("WebSocket connection closed")
|
|
|
|
class BinanceHistoricalData:
|
|
"""Fetch historical candle data from Binance"""
|
|
|
|
def __init__(self):
|
|
self.base_url = "https://api.binance.com/api/v3/klines"
|
|
# Create a cache directory if it doesn't exist
|
|
self.cache_dir = os.path.join(os.getcwd(), "cache")
|
|
os.makedirs(self.cache_dir, exist_ok=True)
|
|
logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}")
|
|
|
|
def _get_interval_string(self, interval_seconds: int) -> str:
|
|
"""Convert interval seconds to Binance interval string"""
|
|
if interval_seconds == 60: # 1m
|
|
return "1m"
|
|
elif interval_seconds == 300: # 5m
|
|
return "5m"
|
|
elif interval_seconds == 900: # 15m
|
|
return "15m"
|
|
elif interval_seconds == 1800: # 30m
|
|
return "30m"
|
|
elif interval_seconds == 3600: # 1h
|
|
return "1h"
|
|
elif interval_seconds == 14400: # 4h
|
|
return "4h"
|
|
elif interval_seconds == 86400: # 1d
|
|
return "1d"
|
|
else:
|
|
# Default to 1m if not recognized
|
|
logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m")
|
|
return "1m"
|
|
|
|
def _get_cache_filename(self, symbol: str, interval: str) -> str:
|
|
"""Generate cache filename for the symbol and interval"""
|
|
# Replace any slashes in symbol with underscore
|
|
safe_symbol = symbol.replace("/", "_")
|
|
return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv")
|
|
|
|
def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]:
|
|
"""Load candle data from cache if available and not expired"""
|
|
filename = self._get_cache_filename(symbol, interval)
|
|
|
|
if not os.path.exists(filename):
|
|
logger.debug(f"No cache file found for {symbol} {interval}")
|
|
return None
|
|
|
|
# Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d)
|
|
file_age = time.time() - os.path.getmtime(filename)
|
|
max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others
|
|
|
|
if file_age > max_age:
|
|
logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)")
|
|
return None
|
|
|
|
try:
|
|
df = pd.read_csv(filename)
|
|
# Convert timestamp string back to datetime
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
|
logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}")
|
|
return df
|
|
except Exception as e:
|
|
logger.error(f"Error loading from cache: {str(e)}")
|
|
return None
|
|
|
|
def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool:
|
|
"""Save candle data to cache"""
|
|
if df.empty:
|
|
logger.warning(f"No data to cache for {symbol} {interval}")
|
|
return False
|
|
|
|
filename = self._get_cache_filename(symbol, interval)
|
|
try:
|
|
df.to_csv(filename, index=False)
|
|
logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving to cache: {str(e)}")
|
|
return False
|
|
|
|
def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame:
|
|
"""Get historical candle data for the specified symbol and interval"""
|
|
# Convert to Binance format
|
|
clean_symbol = symbol.replace("/", "")
|
|
interval = self._get_interval_string(interval_seconds)
|
|
|
|
# Try to load from cache first
|
|
cached_data = self._load_from_cache(symbol, interval)
|
|
if cached_data is not None and len(cached_data) >= limit:
|
|
return cached_data.tail(limit)
|
|
|
|
# Fetch from API if not cached or insufficient
|
|
try:
|
|
logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API")
|
|
|
|
params = {
|
|
"symbol": clean_symbol,
|
|
"interval": interval,
|
|
"limit": limit
|
|
}
|
|
|
|
response = requests.get(self.base_url, params=params)
|
|
response.raise_for_status() # Raise exception for HTTP errors
|
|
|
|
# Process the data
|
|
candles = response.json()
|
|
|
|
if not candles:
|
|
logger.warning(f"No candles returned from Binance for {symbol} {interval}")
|
|
return pd.DataFrame()
|
|
|
|
# Convert to DataFrame - Binance returns data in this format:
|
|
# [
|
|
# [
|
|
# 1499040000000, // Open time
|
|
# "0.01634790", // Open
|
|
# "0.80000000", // High
|
|
# "0.01575800", // Low
|
|
# "0.01577100", // Close
|
|
# "148976.11427815", // Volume
|
|
# ... // Ignore the rest
|
|
# ],
|
|
# ...
|
|
# ]
|
|
|
|
df = pd.DataFrame(candles, columns=[
|
|
"timestamp", "open", "high", "low", "close", "volume",
|
|
"close_time", "quote_asset_volume", "number_of_trades",
|
|
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
|
|
])
|
|
|
|
# Convert types
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
|
|
for col in ["open", "high", "low", "close", "volume"]:
|
|
df[col] = df[col].astype(float)
|
|
|
|
# Keep only needed columns
|
|
df = df[["timestamp", "open", "high", "low", "close", "volume"]]
|
|
|
|
# Cache the results
|
|
self._save_to_cache(df, symbol, interval)
|
|
|
|
logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}")
|
|
return df
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
|
|
|
|
class ExchangeWebSocket:
|
|
"""Generic WebSocket interface for cryptocurrency exchanges"""
|
|
def __init__(self, symbol: str, exchange: str = "binance"):
|
|
self.symbol = symbol
|
|
self.exchange = exchange.lower()
|
|
self.ws = None
|
|
|
|
# Initialize the appropriate WebSocket implementation
|
|
if self.exchange == "binance":
|
|
self.ws = BinanceWebSocket(symbol)
|
|
elif self.exchange == "mexc":
|
|
self.ws = MEXCWebSocket(symbol)
|
|
else:
|
|
raise ValueError(f"Unsupported exchange: {exchange}")
|
|
|
|
async def connect(self):
|
|
"""Connect to the exchange WebSocket"""
|
|
return await self.ws.connect()
|
|
|
|
async def receive(self) -> Optional[Dict]:
|
|
"""Receive data from the WebSocket"""
|
|
return await self.ws.receive()
|
|
|
|
async def close(self):
|
|
"""Close the WebSocket connection"""
|
|
await self.ws.close()
|
|
|
|
@property
|
|
def running(self):
|
|
"""Check if the WebSocket is running"""
|
|
return self.ws.running if self.ws else False
|
|
|
|
class CandleCache:
|
|
def __init__(self, max_candles: int = 5000):
|
|
self.candles = {
|
|
'1s': deque(maxlen=max_candles),
|
|
'1m': deque(maxlen=max_candles),
|
|
'1h': deque(maxlen=max_candles),
|
|
'1d': deque(maxlen=max_candles)
|
|
}
|
|
logger.info(f"Initialized CandleCache with max candles: {max_candles}")
|
|
|
|
def add_candles(self, interval: str, new_candles: pd.DataFrame):
|
|
if interval in self.candles and not new_candles.empty:
|
|
# Convert DataFrame to list of dicts to avoid pandas issues
|
|
for _, row in new_candles.iterrows():
|
|
candle_dict = row.to_dict()
|
|
self.candles[interval].append(candle_dict)
|
|
logger.debug(f"Added {len(new_candles)} candles to {interval} cache")
|
|
|
|
def get_recent_candles(self, interval: str, count: int = 500) -> pd.DataFrame:
|
|
if interval in self.candles and self.candles[interval]:
|
|
# Convert deque to list of dicts first
|
|
all_candles = list(self.candles[interval])
|
|
# Check if we're requesting more candles than we have
|
|
if count > len(all_candles):
|
|
logger.debug(f"Requested {count} candles, but only have {len(all_candles)} for {interval}")
|
|
count = len(all_candles)
|
|
|
|
recent_candles = all_candles[-count:]
|
|
logger.debug(f"Returning {len(recent_candles)} recent candles for {interval} (requested {count})")
|
|
|
|
# Create DataFrame and ensure timestamp is datetime type
|
|
df = pd.DataFrame(recent_candles)
|
|
if not df.empty and 'timestamp' in df.columns:
|
|
try:
|
|
if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
|
except Exception as e:
|
|
logger.warning(f"Error converting timestamps in get_recent_candles: {str(e)}")
|
|
|
|
return df
|
|
|
|
logger.debug(f"No candles available for {interval}")
|
|
return pd.DataFrame()
|
|
|
|
def update_cache(self, interval: str, new_candles: pd.DataFrame):
|
|
"""
|
|
Update the candle cache for a specific timeframe with new candles
|
|
|
|
Args:
|
|
interval: The timeframe interval ('1s', '1m', '1h', '1d')
|
|
new_candles: DataFrame with new candles to add to the cache
|
|
"""
|
|
if interval not in self.candles:
|
|
logger.warning(f"Invalid interval {interval} for cache update")
|
|
return
|
|
|
|
if new_candles is None or new_candles.empty:
|
|
logger.debug(f"No new candles to update {interval} cache")
|
|
return
|
|
|
|
# Check if timestamp column exists
|
|
if 'timestamp' not in new_candles.columns:
|
|
logger.warning(f"No timestamp column in new candles for {interval}")
|
|
return
|
|
|
|
try:
|
|
# Ensure timestamp is datetime for proper comparison
|
|
try:
|
|
if not pd.api.types.is_datetime64_any_dtype(new_candles['timestamp']):
|
|
logger.debug(f"Converting timestamps to datetime for {interval}")
|
|
new_candles['timestamp'] = pd.to_datetime(new_candles['timestamp'])
|
|
except Exception as e:
|
|
logger.error(f"Error converting timestamps: {str(e)}")
|
|
# Try a different approach
|
|
try:
|
|
new_candles['timestamp'] = pd.to_datetime(new_candles['timestamp'], errors='coerce')
|
|
# Drop any rows where conversion failed
|
|
new_candles = new_candles.dropna(subset=['timestamp'])
|
|
if new_candles.empty:
|
|
logger.warning(f"All timestamps conversion failed for {interval}")
|
|
return
|
|
except Exception as e2:
|
|
logger.error(f"Second attempt to convert timestamps failed: {str(e2)}")
|
|
return
|
|
|
|
# Create a copy to avoid modifying the original
|
|
new_candles_copy = new_candles.copy()
|
|
|
|
# If we have no candles in cache, add all new candles
|
|
if not self.candles[interval]:
|
|
logger.debug(f"No existing candles for {interval}, adding all {len(new_candles_copy)} candles")
|
|
self.add_candles(interval, new_candles_copy)
|
|
return
|
|
|
|
# Get the timestamp from the last cached candle
|
|
last_cached_candle = self.candles[interval][-1]
|
|
if not isinstance(last_cached_candle, dict):
|
|
logger.warning(f"Last cached candle is not a dictionary for {interval}")
|
|
last_cached_candle = {'timestamp': None}
|
|
|
|
if 'timestamp' not in last_cached_candle:
|
|
logger.warning(f"No timestamp in last cached candle for {interval}")
|
|
last_cached_candle['timestamp'] = None
|
|
|
|
last_cached_time = last_cached_candle['timestamp']
|
|
logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}")
|
|
|
|
# If last_cached_time is None, add all candles
|
|
if last_cached_time is None:
|
|
logger.debug(f"No valid last cached timestamp, adding all {len(new_candles_copy)} candles for {interval}")
|
|
self.add_candles(interval, new_candles_copy)
|
|
return
|
|
|
|
# Convert last_cached_time to datetime if needed
|
|
if not isinstance(last_cached_time, (pd.Timestamp, datetime)):
|
|
try:
|
|
last_cached_time = pd.to_datetime(last_cached_time)
|
|
except Exception as e:
|
|
logger.error(f"Cannot convert last cached time to datetime: {str(e)}")
|
|
# Add all candles as fallback
|
|
self.add_candles(interval, new_candles_copy)
|
|
return
|
|
|
|
# Make a backup of current cache before filtering
|
|
cache_backup = list(self.candles[interval])
|
|
|
|
# Filter new candles that are after the last cached candle
|
|
try:
|
|
filtered_candles = new_candles_copy[new_candles_copy['timestamp'] > last_cached_time]
|
|
|
|
if not filtered_candles.empty:
|
|
logger.debug(f"Adding {len(filtered_candles)} new candles for {interval}")
|
|
self.add_candles(interval, filtered_candles)
|
|
else:
|
|
# No new candles after last cached time, check for missing candles
|
|
try:
|
|
# Get unique timestamps in cache
|
|
cached_timestamps = set()
|
|
for candle in self.candles[interval]:
|
|
if isinstance(candle, dict) and 'timestamp' in candle:
|
|
ts = candle['timestamp']
|
|
if isinstance(ts, (pd.Timestamp, datetime)):
|
|
cached_timestamps.add(ts)
|
|
else:
|
|
try:
|
|
cached_timestamps.add(pd.to_datetime(ts))
|
|
except:
|
|
pass
|
|
|
|
# Find candles in new_candles that aren't in the cache
|
|
missing_candles = new_candles_copy[~new_candles_copy['timestamp'].isin(cached_timestamps)]
|
|
|
|
if not missing_candles.empty:
|
|
logger.info(f"Found {len(missing_candles)} missing candles for {interval}")
|
|
self.add_candles(interval, missing_candles)
|
|
else:
|
|
logger.debug(f"No new or missing candles to add for {interval}")
|
|
except Exception as missing_error:
|
|
logger.error(f"Error checking for missing candles: {str(missing_error)}")
|
|
except Exception as filter_error:
|
|
logger.error(f"Error filtering candles by timestamp: {str(filter_error)}")
|
|
# Restore from backup
|
|
self.candles[interval] = deque(cache_backup, maxlen=self.candles[interval].maxlen)
|
|
# Try adding all candles as fallback
|
|
self.add_candles(interval, new_candles_copy)
|
|
except Exception as e:
|
|
logger.error(f"Unhandled error updating cache for {interval}: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
def get_candles(self, timeframe: str, count: int = 500) -> pd.DataFrame:
|
|
"""
|
|
Get candles for a specific timeframe. This is an alias for get_recent_candles
|
|
to maintain compatibility with code that expects this method name.
|
|
|
|
Args:
|
|
timeframe: The timeframe interval ('1s', '1m', '1h', '1d')
|
|
count: Maximum number of candles to return
|
|
|
|
Returns:
|
|
DataFrame containing the candles
|
|
"""
|
|
try:
|
|
logger.debug(f"Getting {count} candles for {timeframe} via get_candles()")
|
|
return self.get_recent_candles(timeframe, count)
|
|
except Exception as e:
|
|
logger.error(f"Error in get_candles for {timeframe}: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
return pd.DataFrame()
|
|
|
|
class RealTimeChart:
|
|
"""Real-time chart using Dash and Plotly"""
|
|
|
|
def __init__(self, symbol="BTC/USDT", use_sample_data=False, log_no_ticks_warning=True):
|
|
"""Initialize a new RealTimeChart
|
|
|
|
Args:
|
|
symbol: Trading pair symbol (e.g., BTC/USDT)
|
|
use_sample_data: Whether to use sample data when no real data is available
|
|
log_no_ticks_warning: Whether to log warnings when no ticks are available
|
|
"""
|
|
self.symbol = symbol
|
|
self.use_sample_data = use_sample_data
|
|
|
|
# Initialize variables for trading info display
|
|
self.current_signal = 'HOLD'
|
|
self.signal_time = datetime.now()
|
|
self.current_position = 0.0
|
|
self.session_balance = 100.0 # Start with $100 balance
|
|
self.session_pnl = 0.0
|
|
|
|
# Initialize NN signals and trades lists
|
|
self.nn_signals = []
|
|
self.trades = []
|
|
|
|
# Use existing timezone variable instead of trying to detect again
|
|
logger.info(f"Using timezone: {tz_name}")
|
|
|
|
# Initialize tick storage
|
|
logger.info(f"Initializing RealTimeChart for {symbol}")
|
|
self.tick_storage = TradeTickStorage(
|
|
symbol=symbol,
|
|
max_age_seconds=3600, # Keep ticks for 1 hour
|
|
use_sample_data=use_sample_data,
|
|
log_no_ticks_warning=log_no_ticks_warning
|
|
)
|
|
|
|
# Initialize candlestick data for backward compatibility
|
|
self.candlestick_data = CandlestickData(max_length=5000)
|
|
|
|
# Initialize candle cache
|
|
self.candle_cache = CandleCache(max_candles=5000)
|
|
|
|
# Initialize OHLCV cache dictionaries for different timeframes
|
|
self.ohlcv_cache = {
|
|
'1s': None,
|
|
'5s': None,
|
|
'15s': None,
|
|
'60s': None,
|
|
'300s': None,
|
|
'900s': None,
|
|
'3600s': None,
|
|
'1m': None,
|
|
'5m': None,
|
|
'15m': None,
|
|
'1h': None,
|
|
'1d': None
|
|
}
|
|
|
|
# Historical data handler
|
|
self.historical_data = BinanceHistoricalData()
|
|
|
|
# Flag for first render to force data loading
|
|
self.first_render = True
|
|
|
|
# Last time candles were saved to disk
|
|
self.last_cache_save_time = time.time()
|
|
|
|
# Initialize Dash app
|
|
self.app = dash.Dash(
|
|
__name__,
|
|
external_stylesheets=[dbc.themes.DARKLY],
|
|
suppress_callback_exceptions=True,
|
|
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}]
|
|
)
|
|
|
|
# Set up layout and callbacks
|
|
self._setup_app_layout()
|
|
|
|
def _setup_app_layout(self):
|
|
"""Set up the app layout and callbacks"""
|
|
# Define styling for interval buttons
|
|
button_style = {
|
|
'backgroundColor': '#2C2C2C',
|
|
'color': 'white',
|
|
'border': 'none',
|
|
'padding': '10px 15px',
|
|
'margin': '5px',
|
|
'borderRadius': '5px',
|
|
'cursor': 'pointer',
|
|
'fontWeight': 'bold'
|
|
}
|
|
|
|
active_button_style = {
|
|
**button_style,
|
|
'backgroundColor': '#4CAF50',
|
|
'boxShadow': '0 2px 4px rgba(0,0,0,0.5)'
|
|
}
|
|
|
|
# Create tab layout
|
|
self.app.layout = dbc.Tabs([
|
|
dbc.Tab(self._get_chart_layout(button_style, active_button_style), label="Chart", tab_id="chart-tab"),
|
|
# No longer need ticks tab as it's causing errors
|
|
], id="tabs")
|
|
|
|
# Set up callbacks
|
|
self._setup_interval_callback(button_style, active_button_style)
|
|
self._setup_chart_callback()
|
|
# We've removed the ticks callback, so don't call it
|
|
# self._setup_ticks_callback()
|
|
|
|
def _get_chart_layout(self, button_style, active_button_style):
|
|
# Chart page layout
|
|
return html.Div([
|
|
# Chart title and interval buttons
|
|
html.Div([
|
|
html.H2(f"{self.symbol} Real-Time Chart", style={
|
|
'textAlign': 'center',
|
|
'color': '#FFFFFF',
|
|
'marginBottom': '10px'
|
|
}),
|
|
|
|
# Store interval data
|
|
dcc.Store(id='interval-store', data={'interval': 1}),
|
|
|
|
# Interval selection buttons
|
|
html.Div([
|
|
html.Button('1s', id='btn-1s', n_clicks=0, style=active_button_style),
|
|
html.Button('5s', id='btn-5s', n_clicks=0, style=button_style),
|
|
html.Button('15s', id='btn-15s', n_clicks=0, style=button_style),
|
|
html.Button('30s', id='btn-30s', n_clicks=0, style=button_style),
|
|
html.Button('1m', id='btn-1m', n_clicks=0, style=button_style),
|
|
], style={
|
|
'display': 'flex',
|
|
'justifyContent': 'center',
|
|
'marginBottom': '15px'
|
|
}),
|
|
|
|
# Interval component for updates - set to refresh every 500ms
|
|
dcc.Interval(
|
|
id='interval-component',
|
|
interval=300, # Refresh every 300ms for better real-time updates
|
|
n_intervals=0
|
|
),
|
|
|
|
# Main chart
|
|
dcc.Graph(id='live-chart', style={'height': '75vh'}),
|
|
|
|
# Chart acknowledgment
|
|
html.Div("Real-time trading chart with ML signals", style={
|
|
'textAlign': 'center',
|
|
'color': '#AAAAAA',
|
|
'fontSize': '12px',
|
|
'marginTop': '5px'
|
|
})
|
|
])
|
|
])
|
|
|
|
def _get_ticks_layout(self):
|
|
# Ticks data page layout
|
|
return html.Div([
|
|
# Header and controls
|
|
html.Div([
|
|
html.H2(f"{self.symbol} Raw Tick Data (Last 5 Minutes)", style={
|
|
'textAlign': 'center',
|
|
'color': '#FFFFFF',
|
|
'margin': '10px 0'
|
|
}),
|
|
|
|
# Refresh button
|
|
html.Button('Refresh Data', id='refresh-ticks-btn', n_clicks=0, style={
|
|
'backgroundColor': '#4CAF50',
|
|
'color': 'white',
|
|
'padding': '10px 20px',
|
|
'margin': '10px auto',
|
|
'border': 'none',
|
|
'borderRadius': '5px',
|
|
'fontSize': '14px',
|
|
'cursor': 'pointer',
|
|
'display': 'block'
|
|
}),
|
|
|
|
# Time window selector
|
|
html.Div([
|
|
html.Label("Time Window:", style={'color': 'white', 'marginRight': '10px'}),
|
|
dcc.Dropdown(
|
|
id='time-window-dropdown',
|
|
options=[
|
|
{'label': 'Last 1 minute', 'value': 60},
|
|
{'label': 'Last 5 minutes', 'value': 300},
|
|
{'label': 'Last 15 minutes', 'value': 900},
|
|
{'label': 'Last 30 minutes', 'value': 1800},
|
|
],
|
|
value=300, # Default to 5 minutes
|
|
style={'width': '200px', 'backgroundColor': '#2C2C2C', 'color': 'black'}
|
|
)
|
|
], style={
|
|
'display': 'flex',
|
|
'alignItems': 'center',
|
|
'justifyContent': 'center',
|
|
'margin': '10px'
|
|
}),
|
|
], style={
|
|
'backgroundColor': '#2C2C2C',
|
|
'padding': '10px',
|
|
'borderRadius': '5px',
|
|
'marginBottom': '15px'
|
|
}),
|
|
|
|
# Stats cards
|
|
html.Div(id='tick-stats-cards', style={
|
|
'display': 'flex',
|
|
'flexWrap': 'wrap',
|
|
'justifyContent': 'space-around',
|
|
'marginBottom': '15px'
|
|
}),
|
|
|
|
# Ticks data table
|
|
html.Div(id='ticks-table-container', style={
|
|
'backgroundColor': '#232323',
|
|
'padding': '10px',
|
|
'borderRadius': '5px',
|
|
'overflowX': 'auto'
|
|
}),
|
|
|
|
# Price movement chart
|
|
html.Div([
|
|
html.H3("Price Movement", style={
|
|
'textAlign': 'center',
|
|
'color': '#FFFFFF',
|
|
'margin': '10px 0'
|
|
}),
|
|
dcc.Graph(id='tick-price-chart')
|
|
], style={
|
|
'backgroundColor': '#232323',
|
|
'padding': '10px',
|
|
'borderRadius': '5px',
|
|
'marginTop': '15px'
|
|
})
|
|
])
|
|
|
|
def _setup_interval_callback(self, button_style, active_button_style):
|
|
# Callback to update interval based on button clicks and update button styles
|
|
@self.app.callback(
|
|
[Output('interval-store', 'data'),
|
|
Output('btn-1s', 'style'),
|
|
Output('btn-5s', 'style'),
|
|
Output('btn-15s', 'style'),
|
|
Output('btn-30s', 'style'),
|
|
Output('btn-1m', 'style')],
|
|
[Input('btn-1s', 'n_clicks'),
|
|
Input('btn-5s', 'n_clicks'),
|
|
Input('btn-15s', 'n_clicks'),
|
|
Input('btn-30s', 'n_clicks'),
|
|
Input('btn-1m', 'n_clicks')],
|
|
[dash.dependencies.State('interval-store', 'data')]
|
|
)
|
|
def update_interval(n1, n5, n15, n30, n60, data):
|
|
ctx = dash.callback_context
|
|
if not ctx.triggered:
|
|
# Default state (1s selected)
|
|
return ({'interval': 1},
|
|
active_button_style, button_style, button_style, button_style, button_style)
|
|
|
|
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
|
|
|
|
if button_id == 'btn-1s':
|
|
return ({'interval': 1},
|
|
active_button_style, button_style, button_style, button_style, button_style)
|
|
elif button_id == 'btn-5s':
|
|
return ({'interval': 5},
|
|
button_style, active_button_style, button_style, button_style, button_style)
|
|
elif button_id == 'btn-15s':
|
|
return ({'interval': 15},
|
|
button_style, button_style, active_button_style, button_style, button_style)
|
|
elif button_id == 'btn-30s':
|
|
return ({'interval': 30},
|
|
button_style, button_style, button_style, active_button_style, button_style)
|
|
elif button_id == 'btn-1m':
|
|
return ({'interval': 60},
|
|
button_style, button_style, button_style, button_style, active_button_style)
|
|
|
|
# Default case - keep current interval and highlight appropriate button
|
|
current_interval = data.get('interval', 1)
|
|
styles = [button_style] * 5 # All inactive by default
|
|
|
|
# Set active style based on current interval
|
|
if current_interval == 1:
|
|
styles[0] = active_button_style
|
|
elif current_interval == 5:
|
|
styles[1] = active_button_style
|
|
elif current_interval == 15:
|
|
styles[2] = active_button_style
|
|
elif current_interval == 30:
|
|
styles[3] = active_button_style
|
|
elif current_interval == 60:
|
|
styles[4] = active_button_style
|
|
|
|
return (data, *styles)
|
|
|
|
def _setup_chart_callback(self):
|
|
# Callback to update the chart
|
|
@self.app.callback(
|
|
Output('live-chart', 'figure'),
|
|
[Input('interval-component', 'n_intervals'),
|
|
Input('interval-store', 'data')]
|
|
)
|
|
def update_chart(n, interval_data):
|
|
try:
|
|
interval = interval_data.get('interval', 1)
|
|
logger.debug(f"Updating chart with interval {interval}")
|
|
|
|
# Get candlesticks data for the selected interval
|
|
try:
|
|
df = self.tick_storage.get_candles(interval_seconds=interval)
|
|
if df.empty and self.ohlcv_cache[f'{interval}s' if interval < 60 else '1m'] is not None:
|
|
df = self.ohlcv_cache[f'{interval}s' if interval < 60 else '1m']
|
|
except Exception as e:
|
|
logger.error(f"Error getting candles: {str(e)}")
|
|
df = pd.DataFrame()
|
|
|
|
# Get data for other timeframes (1m, 1h, 1d)
|
|
df_1m = None
|
|
df_1h = None
|
|
df_1d = None
|
|
|
|
try:
|
|
# Get 1m candles
|
|
if self.ohlcv_cache['1m'] is not None and not self.ohlcv_cache['1m'].empty:
|
|
df_1m = self.ohlcv_cache['1m'].copy()
|
|
else:
|
|
df_1m = self.tick_storage.get_candles(interval_seconds=60)
|
|
|
|
# Get 1h candles
|
|
if self.ohlcv_cache['1h'] is not None and not self.ohlcv_cache['1h'].empty:
|
|
df_1h = self.ohlcv_cache['1h'].copy()
|
|
else:
|
|
df_1h = self.tick_storage.get_candles(interval_seconds=3600)
|
|
|
|
# Get 1d candles
|
|
if self.ohlcv_cache['1d'] is not None and not self.ohlcv_cache['1d'].empty:
|
|
df_1d = self.ohlcv_cache['1d'].copy()
|
|
else:
|
|
df_1d = self.tick_storage.get_candles(interval_seconds=86400)
|
|
|
|
# Limit the number of candles to display but show more for context
|
|
if df_1m is not None and not df_1m.empty:
|
|
df_1m = df_1m.tail(600) # Show 600 1m candles for better context - 10 hours
|
|
if df_1h is not None and not df_1h.empty:
|
|
df_1h = df_1h.tail(480) # Show 480 hours of hourly data for better context - 20 days
|
|
if df_1d is not None and not df_1d.empty:
|
|
df_1d = df_1d.tail(365*2) # Show 2 years of daily data for better context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting additional timeframes: {str(e)}")
|
|
|
|
# Create layout with 5 rows (main chart, volume, 1m, 1h, 1d)
|
|
fig = make_subplots(
|
|
rows=5, cols=1,
|
|
vertical_spacing=0.03,
|
|
subplot_titles=(
|
|
f'{self.symbol} Price ({interval}s)',
|
|
'Volume',
|
|
'1-Minute Chart',
|
|
'1-Hour Chart',
|
|
'1-Day Chart'
|
|
),
|
|
row_heights=[0.4, 0.15, 0.15, 0.15, 0.15]
|
|
)
|
|
|
|
# Add candlestick chart for main timeframe
|
|
if not df.empty and 'open' in df.columns:
|
|
# Candlestick chart
|
|
fig.add_trace(
|
|
go.Candlestick(
|
|
x=df.index,
|
|
open=df['open'],
|
|
high=df['high'],
|
|
low=df['low'],
|
|
close=df['close'],
|
|
name='OHLC'
|
|
),
|
|
row=1, col=1
|
|
)
|
|
|
|
# Calculate y-axis range with padding
|
|
low_min = df['low'].min()
|
|
high_max = df['high'].max()
|
|
price_range = high_max - low_min
|
|
y_min = low_min - (price_range * 0.05) # 5% padding below
|
|
y_max = high_max + (price_range * 0.05) # 5% padding above
|
|
|
|
# Set y-axis range to ensure candles are visible
|
|
fig.update_yaxes(range=[y_min, y_max], row=1, col=1)
|
|
|
|
# Volume bars
|
|
colors = ['rgba(0,255,0,0.7)' if close >= open else 'rgba(255,0,0,0.7)'
|
|
for open, close in zip(df['open'], df['close'])]
|
|
|
|
fig.add_trace(
|
|
go.Bar(
|
|
x=df.index,
|
|
y=df['volume'],
|
|
name='Volume',
|
|
marker_color=colors
|
|
),
|
|
row=2, col=1
|
|
)
|
|
|
|
# Add buy/sell markers and PnL annotations to the main chart
|
|
if hasattr(self, 'trades') and self.trades:
|
|
buy_times = []
|
|
buy_prices = []
|
|
buy_markers = []
|
|
sell_times = []
|
|
sell_prices = []
|
|
sell_markers = []
|
|
|
|
# Filter trades to only include recent ones (last 100)
|
|
recent_trades = self.trades[-100:]
|
|
|
|
for trade in recent_trades:
|
|
# Convert timestamp to datetime if it's not already
|
|
trade_time = trade.get('timestamp')
|
|
if isinstance(trade_time, (int, float)):
|
|
trade_time = pd.to_datetime(trade_time, unit='ms')
|
|
|
|
price = trade.get('price', 0)
|
|
pnl = trade.get('pnl', None)
|
|
action = trade.get('action', 'SELL') # Default to SELL
|
|
|
|
if action == 'BUY':
|
|
buy_times.append(trade_time)
|
|
buy_prices.append(price)
|
|
buy_markers.append("")
|
|
elif action == 'SELL':
|
|
sell_times.append(trade_time)
|
|
sell_prices.append(price)
|
|
# Add PnL as marker text if available
|
|
if pnl is not None:
|
|
pnl_text = f"{pnl:.4f}" if abs(pnl) < 0.01 else f"{pnl:.2f}"
|
|
sell_markers.append(pnl_text)
|
|
else:
|
|
sell_markers.append("")
|
|
|
|
# Add buy markers
|
|
if buy_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=buy_times,
|
|
y=buy_prices,
|
|
mode='markers',
|
|
name='Buy',
|
|
marker=dict(
|
|
symbol='triangle-up',
|
|
size=12,
|
|
color='rgba(0,255,0,0.8)',
|
|
line=dict(width=1, color='darkgreen')
|
|
),
|
|
text=buy_markers,
|
|
hoverinfo='x+y+text',
|
|
showlegend=True # Ensure Buy appears in legend
|
|
),
|
|
row=1, col=1
|
|
)
|
|
|
|
# Add vertical and horizontal connecting lines for buys
|
|
for i, (btime, bprice) in enumerate(zip(buy_times, buy_prices)):
|
|
# Add vertical dashed line to time axis
|
|
fig.add_shape(
|
|
type="line",
|
|
x0=btime, x1=btime,
|
|
y0=y_min, y1=bprice,
|
|
line=dict(color="rgba(0,255,0,0.5)", width=1, dash="dash"),
|
|
row=1, col=1
|
|
)
|
|
# Add horizontal dashed line showing the price level
|
|
fig.add_shape(
|
|
type="line",
|
|
x0=df.index.min(), x1=btime,
|
|
y0=bprice, y1=bprice,
|
|
line=dict(color="rgba(0,255,0,0.5)", width=1, dash="dash"),
|
|
row=1, col=1
|
|
)
|
|
|
|
# Add sell markers with PnL annotations
|
|
if sell_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=sell_times,
|
|
y=sell_prices,
|
|
mode='markers+text',
|
|
name='Sell',
|
|
marker=dict(
|
|
symbol='triangle-down',
|
|
size=12,
|
|
color='rgba(255,0,0,0.8)',
|
|
line=dict(width=1, color='darkred')
|
|
),
|
|
text=sell_markers,
|
|
textposition='top center',
|
|
textfont=dict(size=10),
|
|
hoverinfo='x+y+text',
|
|
showlegend=True # Ensure Sell appears in legend
|
|
),
|
|
row=1, col=1
|
|
)
|
|
|
|
# Add vertical and horizontal connecting lines for sells
|
|
for i, (stime, sprice) in enumerate(zip(sell_times, sell_prices)):
|
|
# Add vertical dashed line to time axis
|
|
fig.add_shape(
|
|
type="line",
|
|
x0=stime, x1=stime,
|
|
y0=y_min, y1=sprice,
|
|
line=dict(color="rgba(255,0,0,0.5)", width=1, dash="dash"),
|
|
row=1, col=1
|
|
)
|
|
# Add horizontal dashed line showing the price level
|
|
fig.add_shape(
|
|
type="line",
|
|
x0=df.index.min(), x1=stime,
|
|
y0=sprice, y1=sprice,
|
|
line=dict(color="rgba(255,0,0,0.5)", width=1, dash="dash"),
|
|
row=1, col=1
|
|
)
|
|
|
|
# Add connecting lines between consecutive buy-sell pairs
|
|
if len(buy_times) > 0 and len(sell_times) > 0:
|
|
# Create pairs of buy-sell trades based on timestamps
|
|
pairs = []
|
|
buys_copy = list(zip(buy_times, buy_prices))
|
|
|
|
for i, (stime, sprice) in enumerate(zip(sell_times, sell_prices)):
|
|
# Find the most recent buy before this sell
|
|
matching_buy = None
|
|
for j, (btime, bprice) in enumerate(buys_copy):
|
|
if btime < stime:
|
|
matching_buy = (btime, bprice)
|
|
buys_copy.pop(j) # Remove this buy to prevent reuse
|
|
break
|
|
|
|
if matching_buy:
|
|
pairs.append((matching_buy, (stime, sprice)))
|
|
|
|
# Add connecting lines for each pair
|
|
for (btime, bprice), (stime, sprice) in pairs:
|
|
# Draw line connecting the buy and sell points
|
|
fig.add_shape(
|
|
type="line",
|
|
x0=btime, x1=stime,
|
|
y0=bprice, y1=sprice,
|
|
line=dict(
|
|
color="rgba(255,255,255,0.5)",
|
|
width=1,
|
|
dash="dot"
|
|
),
|
|
row=1, col=1
|
|
)
|
|
|
|
# Add 1m chart
|
|
if df_1m is not None and not df_1m.empty and 'open' in df_1m.columns:
|
|
fig.add_trace(
|
|
go.Candlestick(
|
|
x=df_1m.index,
|
|
open=df_1m['open'],
|
|
high=df_1m['high'],
|
|
low=df_1m['low'],
|
|
close=df_1m['close'],
|
|
name='1m',
|
|
showlegend=False
|
|
),
|
|
row=3, col=1
|
|
)
|
|
|
|
# Set appropriate date format for 1m chart
|
|
fig.update_xaxes(
|
|
title_text="",
|
|
row=3,
|
|
col=1,
|
|
tickformat="%H:%M",
|
|
tickmode="auto",
|
|
nticks=12
|
|
)
|
|
|
|
# Add buy/sell markers to 1m chart if they fall within the visible timeframe
|
|
if hasattr(self, 'trades') and self.trades:
|
|
# Filter trades visible in 1m timeframe
|
|
min_time = df_1m.index.min()
|
|
max_time = df_1m.index.max()
|
|
|
|
# Ensure min_time and max_time are pandas.Timestamp objects
|
|
if isinstance(min_time, (int, float)):
|
|
min_time = pd.to_datetime(min_time, unit='ms')
|
|
if isinstance(max_time, (int, float)):
|
|
max_time = pd.to_datetime(max_time, unit='ms')
|
|
|
|
# Collect only trades within this timeframe
|
|
minute_buy_times = []
|
|
minute_buy_prices = []
|
|
minute_sell_times = []
|
|
minute_sell_prices = []
|
|
|
|
for trade in self.trades[-100:]:
|
|
trade_time = trade.get('timestamp')
|
|
if isinstance(trade_time, (int, float)):
|
|
# Convert numeric timestamp to datetime
|
|
trade_time = pd.to_datetime(trade_time, unit='ms')
|
|
elif not isinstance(trade_time, pd.Timestamp) and not isinstance(trade_time, datetime):
|
|
# Skip trades with invalid timestamp format
|
|
continue
|
|
|
|
# Check if trade falls within 1m chart timeframe
|
|
try:
|
|
if min_time <= trade_time <= max_time:
|
|
price = trade.get('price', 0)
|
|
action = trade.get('action', 'SELL')
|
|
|
|
if action == 'BUY':
|
|
minute_buy_times.append(trade_time)
|
|
minute_buy_prices.append(price)
|
|
elif action == 'SELL':
|
|
minute_sell_times.append(trade_time)
|
|
minute_sell_prices.append(price)
|
|
except TypeError:
|
|
# If comparison fails due to type mismatch, log the error and skip this trade
|
|
logger.warning(f"Type mismatch in timestamp comparison: min_time={type(min_time)}, trade_time={type(trade_time)}")
|
|
continue
|
|
|
|
# Add buy markers to 1m chart
|
|
if minute_buy_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=minute_buy_times,
|
|
y=minute_buy_prices,
|
|
mode='markers',
|
|
name='Buy (1m)',
|
|
marker=dict(
|
|
symbol='triangle-up',
|
|
size=8,
|
|
color='rgba(0,255,0,0.8)',
|
|
line=dict(width=1, color='darkgreen')
|
|
),
|
|
showlegend=False,
|
|
hoverinfo='x+y'
|
|
),
|
|
row=3, col=1
|
|
)
|
|
|
|
# Add sell markers to 1m chart
|
|
if minute_sell_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=minute_sell_times,
|
|
y=minute_sell_prices,
|
|
mode='markers',
|
|
name='Sell (1m)',
|
|
marker=dict(
|
|
symbol='triangle-down',
|
|
size=8,
|
|
color='rgba(255,0,0,0.8)',
|
|
line=dict(width=1, color='darkred')
|
|
),
|
|
showlegend=False,
|
|
hoverinfo='x+y'
|
|
),
|
|
row=3, col=1
|
|
)
|
|
|
|
# Add 1h chart
|
|
if df_1h is not None and not df_1h.empty and 'open' in df_1h.columns:
|
|
fig.add_trace(
|
|
go.Candlestick(
|
|
x=df_1h.index,
|
|
open=df_1h['open'],
|
|
high=df_1h['high'],
|
|
low=df_1h['low'],
|
|
close=df_1h['close'],
|
|
name='1h',
|
|
showlegend=False
|
|
),
|
|
row=4, col=1
|
|
)
|
|
|
|
# Set appropriate date format for 1h chart
|
|
fig.update_xaxes(
|
|
title_text="",
|
|
row=4,
|
|
col=1,
|
|
tickformat="%m-%d %H:%M",
|
|
tickmode="auto",
|
|
nticks=8
|
|
)
|
|
|
|
# Add buy/sell markers to 1h chart if they fall within the visible timeframe
|
|
if hasattr(self, 'trades') and self.trades:
|
|
# Filter trades visible in 1h timeframe
|
|
min_time = df_1h.index.min()
|
|
max_time = df_1h.index.max()
|
|
|
|
# Ensure min_time and max_time are pandas.Timestamp objects
|
|
if isinstance(min_time, (int, float)):
|
|
min_time = pd.to_datetime(min_time, unit='ms')
|
|
if isinstance(max_time, (int, float)):
|
|
max_time = pd.to_datetime(max_time, unit='ms')
|
|
|
|
# Collect only trades within this timeframe
|
|
hour_buy_times = []
|
|
hour_buy_prices = []
|
|
hour_sell_times = []
|
|
hour_sell_prices = []
|
|
|
|
for trade in self.trades[-200:]: # Check more trades for longer timeframe
|
|
trade_time = trade.get('timestamp')
|
|
if isinstance(trade_time, (int, float)):
|
|
# Convert numeric timestamp to datetime
|
|
trade_time = pd.to_datetime(trade_time, unit='ms')
|
|
elif not isinstance(trade_time, pd.Timestamp) and not isinstance(trade_time, datetime):
|
|
# Skip trades with invalid timestamp format
|
|
continue
|
|
|
|
# Check if trade falls within 1h chart timeframe
|
|
try:
|
|
if min_time <= trade_time <= max_time:
|
|
price = trade.get('price', 0)
|
|
action = trade.get('action', 'SELL')
|
|
|
|
if action == 'BUY':
|
|
hour_buy_times.append(trade_time)
|
|
hour_buy_prices.append(price)
|
|
elif action == 'SELL':
|
|
hour_sell_times.append(trade_time)
|
|
hour_sell_prices.append(price)
|
|
except TypeError:
|
|
# If comparison fails due to type mismatch, log the error and skip this trade
|
|
logger.warning(f"Type mismatch in timestamp comparison: min_time={type(min_time)}, trade_time={type(trade_time)}")
|
|
continue
|
|
|
|
# Add buy markers to 1h chart
|
|
if hour_buy_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=hour_buy_times,
|
|
y=hour_buy_prices,
|
|
mode='markers',
|
|
name='Buy (1h)',
|
|
marker=dict(
|
|
symbol='triangle-up',
|
|
size=6,
|
|
color='rgba(0,255,0,0.8)',
|
|
line=dict(width=1, color='darkgreen')
|
|
),
|
|
showlegend=False,
|
|
hoverinfo='x+y'
|
|
),
|
|
row=4, col=1
|
|
)
|
|
|
|
# Add sell markers to 1h chart
|
|
if hour_sell_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=hour_sell_times,
|
|
y=hour_sell_prices,
|
|
mode='markers',
|
|
name='Sell (1h)',
|
|
marker=dict(
|
|
symbol='triangle-down',
|
|
size=6,
|
|
color='rgba(255,0,0,0.8)',
|
|
line=dict(width=1, color='darkred')
|
|
),
|
|
showlegend=False,
|
|
hoverinfo='x+y'
|
|
),
|
|
row=4, col=1
|
|
)
|
|
# Add 1d chart
|
|
if df_1d is not None and not df_1d.empty and 'open' in df_1d.columns:
|
|
fig.add_trace(
|
|
go.Candlestick(
|
|
x=df_1d.index,
|
|
open=df_1d['open'],
|
|
high=df_1d['high'],
|
|
low=df_1d['low'],
|
|
close=df_1d['close'],
|
|
name='1d',
|
|
showlegend=False
|
|
),
|
|
row=5, col=1
|
|
)
|
|
|
|
# Set appropriate date format for 1d chart
|
|
fig.update_xaxes(
|
|
title_text="",
|
|
row=5,
|
|
col=1,
|
|
tickformat="%Y-%m-%d",
|
|
tickmode="auto",
|
|
nticks=10
|
|
)
|
|
|
|
# Add buy/sell markers to 1d chart if they fall within the visible timeframe
|
|
if hasattr(self, 'trades') and self.trades:
|
|
# Filter trades visible in 1d timeframe
|
|
min_time = df_1d.index.min()
|
|
max_time = df_1d.index.max()
|
|
|
|
# Ensure min_time and max_time are pandas.Timestamp objects
|
|
if isinstance(min_time, (int, float)):
|
|
min_time = pd.to_datetime(min_time, unit='ms')
|
|
if isinstance(max_time, (int, float)):
|
|
max_time = pd.to_datetime(max_time, unit='ms')
|
|
|
|
# Collect only trades within this timeframe
|
|
day_buy_times = []
|
|
day_buy_prices = []
|
|
day_sell_times = []
|
|
day_sell_prices = []
|
|
|
|
for trade in self.trades[-300:]: # Check more trades for daily timeframe
|
|
trade_time = trade.get('timestamp')
|
|
if isinstance(trade_time, (int, float)):
|
|
# Convert numeric timestamp to datetime
|
|
trade_time = pd.to_datetime(trade_time, unit='ms')
|
|
elif not isinstance(trade_time, pd.Timestamp) and not isinstance(trade_time, datetime):
|
|
# Skip trades with invalid timestamp format
|
|
continue
|
|
|
|
# Check if trade falls within 1d chart timeframe
|
|
try:
|
|
if min_time <= trade_time <= max_time:
|
|
price = trade.get('price', 0)
|
|
action = trade.get('action', 'SELL')
|
|
|
|
if action == 'BUY':
|
|
day_buy_times.append(trade_time)
|
|
day_buy_prices.append(price)
|
|
elif action == 'SELL':
|
|
day_sell_times.append(trade_time)
|
|
day_sell_prices.append(price)
|
|
except TypeError:
|
|
# If comparison fails due to type mismatch, log the error and skip this trade
|
|
logger.warning(f"Type mismatch in timestamp comparison: min_time={type(min_time)}, trade_time={type(trade_time)}")
|
|
continue
|
|
|
|
# Add buy markers to 1d chart
|
|
if day_buy_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=day_buy_times,
|
|
y=day_buy_prices,
|
|
mode='markers',
|
|
name='Buy (1d)',
|
|
marker=dict(
|
|
symbol='triangle-up',
|
|
size=5,
|
|
color='rgba(0,255,0,0.8)',
|
|
line=dict(width=1, color='darkgreen')
|
|
),
|
|
showlegend=False,
|
|
hoverinfo='x+y'
|
|
),
|
|
row=5, col=1
|
|
)
|
|
|
|
# Add sell markers to 1d chart
|
|
if day_sell_times:
|
|
fig.add_trace(
|
|
go.Scatter(
|
|
x=day_sell_times,
|
|
y=day_sell_prices,
|
|
mode='markers',
|
|
name='Sell (1d)',
|
|
marker=dict(
|
|
symbol='triangle-down',
|
|
size=5,
|
|
color='rgba(255,0,0,0.8)',
|
|
line=dict(width=1, color='darkred')
|
|
),
|
|
showlegend=False,
|
|
hoverinfo='x+y'
|
|
),
|
|
row=5, col=1
|
|
)
|
|
|
|
# Add trading info annotation if available
|
|
if hasattr(self, 'current_signal') and self.current_signal:
|
|
signal_color = "#33DD33" if self.current_signal == "BUY" else "#FF4444" if self.current_signal == "SELL" else "#BBBBBB"
|
|
|
|
# Format position value
|
|
position_text = f"{self.current_position:.4f}" if self.current_position < 0.01 else f"{self.current_position:.2f}"
|
|
|
|
# Format PnL with color based on value
|
|
pnl_color = "#33DD33" if self.session_pnl >= 0 else "#FF4444"
|
|
pnl_text = f"<b style='color:{pnl_color}'>{self.session_pnl:.4f}</b>"
|
|
|
|
# Create trading info text
|
|
info_text = (
|
|
f"Signal: <b style='color:{signal_color}'>{self.current_signal}</b> | "
|
|
f"Position: <b>{position_text}</b> | "
|
|
f"Balance: <b>${self.session_balance:.2f}</b> | "
|
|
f"PnL: {pnl_text}"
|
|
)
|
|
|
|
# Add annotation
|
|
fig.add_annotation(
|
|
x=0.5, y=1.05,
|
|
xref="paper", yref="paper",
|
|
text=info_text,
|
|
showarrow=False,
|
|
font=dict(size=14, color="white"),
|
|
bgcolor="rgba(50,50,50,0.6)",
|
|
borderwidth=1,
|
|
borderpad=6,
|
|
align="center"
|
|
)
|
|
|
|
# Update layout
|
|
fig.update_layout(
|
|
title_text=f"{self.symbol} Real-Time Data",
|
|
title_x=0.5,
|
|
xaxis_rangeslider_visible=False,
|
|
height=1000, # Increased height to accommodate all charts
|
|
template='plotly_dark',
|
|
paper_bgcolor='rgba(0,0,0,0)',
|
|
plot_bgcolor='rgba(25,25,50,1)'
|
|
)
|
|
|
|
# Update axes styling for all subplots
|
|
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
|
|
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
|
|
|
|
# Hide rangesliders for all candlestick charts
|
|
fig.update_layout(
|
|
xaxis_rangeslider_visible=False,
|
|
xaxis2_rangeslider_visible=False,
|
|
xaxis3_rangeslider_visible=False,
|
|
xaxis4_rangeslider_visible=False,
|
|
xaxis5_rangeslider_visible=False
|
|
)
|
|
|
|
# Improve date formatting for the main chart
|
|
fig.update_xaxes(
|
|
title_text="",
|
|
row=1,
|
|
col=1,
|
|
tickformat="%H:%M:%S",
|
|
tickmode="auto",
|
|
nticks=15
|
|
)
|
|
|
|
return fig
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating chart: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
fig = go.Figure()
|
|
fig.add_annotation(
|
|
x=0.5, y=0.5,
|
|
text=f"Error updating chart: {str(e)}",
|
|
showarrow=False,
|
|
font=dict(size=14, color="red"),
|
|
xref="paper", yref="paper"
|
|
)
|
|
return fig
|
|
|
|
def _interval_to_seconds(self, interval_key: str) -> int:
|
|
"""Convert interval key to seconds"""
|
|
mapping = {
|
|
'1s': 1,
|
|
'1m': 60,
|
|
'1h': 3600,
|
|
'1d': 86400
|
|
}
|
|
return mapping.get(interval_key, 1)
|
|
|
|
async def start_websocket(self):
|
|
ws = ExchangeWebSocket(self.symbol)
|
|
connection_attempts = 0
|
|
max_attempts = 10 # Maximum connection attempts before longer waiting period
|
|
|
|
while True: # Keep trying to maintain connection
|
|
connection_attempts += 1
|
|
if not await ws.connect():
|
|
logger.error(f"Failed to connect to exchange for {self.symbol}")
|
|
# Gradually increase wait time based on number of connection failures
|
|
wait_time = min(5 * connection_attempts, 60) # Cap at 60 seconds
|
|
logger.warning(f"Waiting {wait_time} seconds before retry (attempt {connection_attempts})")
|
|
|
|
if connection_attempts >= max_attempts:
|
|
logger.warning(f"Reached {max_attempts} connection attempts, taking a longer break")
|
|
await asyncio.sleep(120) # 2 minutes wait after max attempts
|
|
connection_attempts = 0 # Reset counter
|
|
else:
|
|
await asyncio.sleep(wait_time)
|
|
continue
|
|
|
|
# Successfully connected
|
|
connection_attempts = 0
|
|
|
|
try:
|
|
logger.info(f"WebSocket connected for {self.symbol}, beginning data collection")
|
|
tick_count = 0
|
|
last_tick_count_log = time.time()
|
|
last_status_report = time.time()
|
|
|
|
# Track stats for reporting
|
|
price_min = float('inf')
|
|
price_max = float('-inf')
|
|
price_last = None
|
|
volume_total = 0
|
|
start_collection_time = time.time()
|
|
|
|
while True:
|
|
if not ws.running:
|
|
logger.warning(f"WebSocket connection lost for {self.symbol}, breaking loop")
|
|
break
|
|
|
|
data = await ws.receive()
|
|
if data:
|
|
if data.get('type') == 'kline':
|
|
# Use kline data directly for candlestick
|
|
trade_data = {
|
|
'timestamp': data['timestamp'],
|
|
'price': data['price'],
|
|
'volume': data['volume'],
|
|
'open': data['open'],
|
|
'high': data['high'],
|
|
'low': data['low']
|
|
}
|
|
logger.debug(f"Received kline data: {data}")
|
|
else:
|
|
# Use trade data
|
|
trade_data = {
|
|
'timestamp': data['timestamp'],
|
|
'price': data['price'],
|
|
'volume': data['volume']
|
|
}
|
|
|
|
# Update stats
|
|
price = trade_data['price']
|
|
volume = trade_data['volume']
|
|
price_min = min(price_min, price)
|
|
price_max = max(price_max, price)
|
|
price_last = price
|
|
volume_total += volume
|
|
|
|
# Store raw tick in the tick storage
|
|
self.tick_storage.add_tick(trade_data)
|
|
tick_count += 1
|
|
|
|
# Also update the old candlestick data for backward compatibility
|
|
# Add check to ensure the candlestick_data attribute exists before using it
|
|
if hasattr(self, 'candlestick_data'):
|
|
self.candlestick_data.update_from_trade(trade_data)
|
|
|
|
# Log tick counts periodically
|
|
current_time = time.time()
|
|
if current_time - last_tick_count_log >= 10: # Log every 10 seconds
|
|
elapsed = current_time - last_tick_count_log
|
|
tps = tick_count / elapsed if elapsed > 0 else 0
|
|
logger.info(f"{self.symbol}: Collected {tick_count} ticks in last {elapsed:.1f}s ({tps:.2f} ticks/sec), total: {len(self.tick_storage.ticks)}")
|
|
last_tick_count_log = current_time
|
|
tick_count = 0
|
|
|
|
# Check if ticks are being converted to candles
|
|
if len(self.tick_storage.ticks) > 0:
|
|
sample_df = self.tick_storage.get_candles(interval_seconds=1)
|
|
logger.info(f"{self.symbol}: Sample candle count: {len(sample_df)}")
|
|
|
|
# Periodic status report (every 60 seconds)
|
|
if current_time - last_status_report >= 60:
|
|
elapsed_total = current_time - start_collection_time
|
|
logger.info(f"{self.symbol} Status Report:")
|
|
logger.info(f" Collection time: {elapsed_total:.1f} seconds")
|
|
logger.info(f" Price range: {price_min:.2f} - {price_max:.2f} (last: {price_last:.2f})")
|
|
logger.info(f" Total volume: {volume_total:.8f}")
|
|
logger.info(f" Active ticks in storage: {len(self.tick_storage.ticks)}")
|
|
|
|
# Reset stats for next period
|
|
last_status_report = current_time
|
|
price_min = float('inf') if price_last is None else price_last
|
|
price_max = float('-inf') if price_last is None else price_last
|
|
volume_total = 0
|
|
|
|
await asyncio.sleep(0.01)
|
|
except websockets.exceptions.ConnectionClosed as e:
|
|
logger.error(f"WebSocket connection closed for {self.symbol}: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Error in WebSocket loop for {self.symbol}: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
finally:
|
|
logger.info(f"Closing WebSocket connection for {self.symbol}")
|
|
await ws.close()
|
|
|
|
logger.info(f"Waiting 5 seconds before reconnecting {self.symbol} WebSocket...")
|
|
await asyncio.sleep(5)
|
|
|
|
def run(self, host='localhost', port=8050):
|
|
"""Run the Dash app
|
|
|
|
Args:
|
|
host: Hostname to run on
|
|
port: Port to run on
|
|
"""
|
|
logger.info(f"Starting Dash app on {host}:{port}")
|
|
|
|
# Ensure interval component is created
|
|
if not hasattr(self, 'app') or not self.app.layout:
|
|
logger.error("App layout not initialized properly")
|
|
return
|
|
|
|
# If interval-component is not in the layout, add it
|
|
if 'interval-component' not in str(self.app.layout):
|
|
logger.warning("Interval component not found in layout, adding it")
|
|
self.app.layout.children.append(
|
|
dcc.Interval(
|
|
id='interval-component',
|
|
interval=500, # 500ms for real-time updates
|
|
n_intervals=0
|
|
)
|
|
)
|
|
|
|
# Start websocket connection in a separate thread
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
self.websocket_thread = threading.Thread(target=lambda: asyncio.run(self.start_websocket()))
|
|
self.websocket_thread.daemon = True
|
|
self.websocket_thread.start()
|
|
|
|
# Ensure historical data is loaded before starting
|
|
self._load_historical_data()
|
|
|
|
try:
|
|
self.app.run(host=host, port=port, debug=False)
|
|
except Exception as e:
|
|
logger.error(f"Error running Dash app: {str(e)}")
|
|
finally:
|
|
# Ensure resources are cleaned up
|
|
self._save_candles_to_disk(force=True)
|
|
logger.info("Dash app stopped")
|
|
|
|
def _load_historical_data(self):
|
|
"""Load historical data for all timeframes from Binance API and local cache"""
|
|
try:
|
|
logger.info(f"Loading historical data for {self.symbol}...")
|
|
|
|
# Define intervals to fetch
|
|
intervals = {
|
|
'1s': 1,
|
|
'1m': 60,
|
|
'1h': 3600,
|
|
'1d': 86400
|
|
}
|
|
|
|
# Track load status
|
|
load_status = {interval: False for interval in intervals.keys()}
|
|
|
|
# First try to load from local cache files
|
|
logger.info("Step 1: Loading from local cache files...")
|
|
for interval_key, interval_seconds in intervals.items():
|
|
try:
|
|
cache_file = os.path.join(self.historical_data.cache_dir,
|
|
f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv")
|
|
|
|
logger.info(f"Checking for cached {interval_key} data at {cache_file}")
|
|
if os.path.exists(cache_file):
|
|
# Check if cache is fresh (less than 1 day old for anything but 1d, 3 days for 1d)
|
|
file_age = time.time() - os.path.getmtime(cache_file)
|
|
max_age = 259200 if interval_key == '1d' else 86400 # 3 days for 1d, 1 day for others
|
|
logger.info(f"Cache file age: {file_age:.1f}s, max allowed: {max_age}s")
|
|
|
|
if file_age <= max_age:
|
|
logger.info(f"Loading {interval_key} candles from cache")
|
|
cached_df = pd.read_csv(cache_file)
|
|
if not cached_df.empty:
|
|
# Diagnostic info about the loaded data
|
|
logger.info(f"Loaded {len(cached_df)} candles from {cache_file}")
|
|
logger.info(f"Columns: {cached_df.columns.tolist()}")
|
|
logger.info(f"First few rows: {cached_df.head(2).to_dict('records')}")
|
|
|
|
# Convert timestamp string back to datetime
|
|
if 'timestamp' in cached_df.columns:
|
|
try:
|
|
if not pd.api.types.is_datetime64_any_dtype(cached_df['timestamp']):
|
|
cached_df['timestamp'] = pd.to_datetime(cached_df['timestamp'])
|
|
logger.info("Successfully converted timestamps to datetime")
|
|
except Exception as e:
|
|
logger.warning(f"Could not convert timestamp column for {interval_key}: {str(e)}")
|
|
|
|
# Only keep the last 2000 candles for memory efficiency
|
|
if len(cached_df) > 2000:
|
|
cached_df = cached_df.tail(2000)
|
|
logger.info(f"Truncated to last 2000 candles")
|
|
|
|
# Add to cache
|
|
for _, row in cached_df.iterrows():
|
|
candle_dict = row.to_dict()
|
|
self.candle_cache.candles[interval_key].append(candle_dict)
|
|
|
|
# Update ohlcv_cache
|
|
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000)
|
|
logger.info(f"Successfully loaded {len(self.ohlcv_cache[interval_key])} cached {interval_key} candles")
|
|
|
|
if len(self.ohlcv_cache[interval_key]) >= 500:
|
|
load_status[interval_key] = True
|
|
# Skip fetching from API if we loaded from cache (except for 1d timeframe which we always refresh)
|
|
if interval_key != '1d':
|
|
continue
|
|
else:
|
|
logger.info(f"Cache file for {interval_key} is too old ({file_age:.1f}s)")
|
|
else:
|
|
logger.info(f"No cache file found for {interval_key}")
|
|
except Exception as e:
|
|
logger.error(f"Error loading cached {interval_key} candles: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
# For timeframes other than 1s, fetch from API as backup or for fresh data
|
|
logger.info("Step 2: Fetching data from API for missing timeframes...")
|
|
for interval_key, interval_seconds in intervals.items():
|
|
# Skip 1s for API requests
|
|
if interval_key == '1s' or load_status[interval_key]:
|
|
logger.info(f"Skipping API fetch for {interval_key}: already loaded or 1s timeframe")
|
|
continue
|
|
|
|
# Fetch historical data from API
|
|
try:
|
|
logger.info(f"Fetching {interval_key} candles from API for {self.symbol}")
|
|
historical_df = self.historical_data.get_historical_candles(
|
|
symbol=self.symbol,
|
|
interval_seconds=interval_seconds,
|
|
limit=500 # Get 500 candles
|
|
)
|
|
|
|
if not historical_df.empty:
|
|
logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key} from API")
|
|
|
|
# If we already have data in cache, merge with new data to avoid duplicates
|
|
if self.ohlcv_cache[interval_key] is not None and not self.ohlcv_cache[interval_key].empty:
|
|
existing_df = self.ohlcv_cache[interval_key]
|
|
# Get the latest timestamp from existing data
|
|
latest_time = existing_df['timestamp'].max()
|
|
# Only keep newer records from API
|
|
new_candles = historical_df[historical_df['timestamp'] > latest_time]
|
|
if not new_candles.empty:
|
|
logger.info(f"Adding {len(new_candles)} new candles to existing {interval_key} cache")
|
|
# Add to cache
|
|
for _, row in new_candles.iterrows():
|
|
candle_dict = row.to_dict()
|
|
self.candle_cache.candles[interval_key].append(candle_dict)
|
|
else:
|
|
# No existing data, add all from API
|
|
for _, row in historical_df.iterrows():
|
|
candle_dict = row.to_dict()
|
|
self.candle_cache.candles[interval_key].append(candle_dict)
|
|
|
|
# Update ohlcv_cache with combined data
|
|
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key, count=2000)
|
|
logger.info(f"Total {interval_key} candles in cache: {len(self.ohlcv_cache[interval_key])}")
|
|
|
|
if len(self.ohlcv_cache[interval_key]) >= 500:
|
|
load_status[interval_key] = True
|
|
else:
|
|
logger.warning(f"No historical data available from API for {self.symbol} {interval_key}")
|
|
except Exception as e:
|
|
logger.error(f"Error fetching {interval_key} data from API: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
# Log summary of loaded data
|
|
logger.info("Historical data load summary:")
|
|
for interval_key in intervals.keys():
|
|
count = len(self.ohlcv_cache[interval_key]) if self.ohlcv_cache[interval_key] is not None else 0
|
|
status = "Success" if load_status[interval_key] else "Failed"
|
|
if count > 0 and count < 500:
|
|
status = "Partial"
|
|
logger.info(f"{interval_key}: {count} candles - {status}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in _load_historical_data: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
def _save_candles_to_disk(self, force=False):
|
|
"""Save current candle cache to disk for persistence between runs"""
|
|
try:
|
|
# Only save if we have data and sufficient time has passed (every 5 minutes)
|
|
current_time = time.time()
|
|
if not force and current_time - self.last_cache_save_time < 300: # 5 minutes
|
|
return
|
|
|
|
# Save each timeframe's candles to disk
|
|
for interval_key, candles in self.candle_cache.candles.items():
|
|
if candles:
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(list(candles))
|
|
if not df.empty:
|
|
# Ensure timestamp is properly formatted
|
|
if 'timestamp' in df.columns:
|
|
try:
|
|
if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
|
except:
|
|
logger.warning(f"Could not convert timestamp column for {interval_key}")
|
|
|
|
# Save to disk in the cache directory
|
|
cache_file = os.path.join(self.historical_data.cache_dir,
|
|
f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv")
|
|
df.to_csv(cache_file, index=False)
|
|
logger.info(f"Saved {len(df)} {interval_key} candles to {cache_file}")
|
|
|
|
self.last_cache_save_time = current_time
|
|
logger.info(f"Saved all candle caches to disk at {datetime.now()}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving candles to disk: {str(e)}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|
|
|
|
def add_nn_signal(self, signal_type, timestamp, probability=None):
|
|
"""Add a neural network signal to be displayed on the chart
|
|
|
|
Args:
|
|
signal_type: The type of signal (BUY, SELL, HOLD)
|
|
timestamp: The timestamp for the signal
|
|
probability: Optional probability/confidence value
|
|
"""
|
|
if signal_type not in ['BUY', 'SELL', 'HOLD']:
|
|
logger.warning(f"Invalid NN signal type: {signal_type}")
|
|
return
|
|
|
|
# Convert timestamp to datetime if it's not already
|
|
if not isinstance(timestamp, datetime):
|
|
try:
|
|
if isinstance(timestamp, str):
|
|
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
|
|
elif isinstance(timestamp, (int, float)):
|
|
timestamp = datetime.fromtimestamp(timestamp / 1000.0)
|
|
except Exception as e:
|
|
logger.error(f"Error converting timestamp for NN signal: {str(e)}")
|
|
timestamp = datetime.now()
|
|
|
|
# Add the signal to our list
|
|
self.nn_signals.append({
|
|
'type': signal_type,
|
|
'timestamp': timestamp,
|
|
'probability': probability,
|
|
'added': datetime.now()
|
|
})
|
|
|
|
# Only keep the most recent 50 signals
|
|
if len(self.nn_signals) > 50:
|
|
self.nn_signals = self.nn_signals[-50:]
|
|
|
|
logger.info(f"Added NN signal: {signal_type} at {timestamp}")
|
|
|
|
def add_trade(self, price, timestamp, pnl=None, amount=0.1, action=None, type=None):
|
|
"""Add a trade to be displayed on the chart
|
|
|
|
Args:
|
|
price: The price at which the trade was executed
|
|
timestamp: The timestamp for the trade
|
|
pnl: Optional profit and loss value for the trade
|
|
amount: Amount traded
|
|
action: The type of trade (BUY or SELL) - alternative to type parameter
|
|
type: The type of trade (BUY or SELL) - alternative to action parameter
|
|
"""
|
|
# Handle both action and type parameters for backward compatibility
|
|
trade_type = type or action
|
|
|
|
# Default to BUY if trade_type is None or not specified
|
|
if trade_type is None:
|
|
logger.warning(f"Trade type not specified in add_trade call, defaulting to BUY. Price: {price}, Timestamp: {timestamp}")
|
|
trade_type = "BUY"
|
|
|
|
if isinstance(trade_type, int):
|
|
trade_type = "BUY" if trade_type == 0 else "SELL"
|
|
|
|
# Ensure trade_type is uppercase if it's a string
|
|
if isinstance(trade_type, str):
|
|
trade_type = trade_type.upper()
|
|
|
|
if trade_type not in ['BUY', 'SELL']:
|
|
logger.warning(f"Invalid trade type: {trade_type} (value type: {type(trade_type).__name__}), defaulting to BUY. Price: {price}, Timestamp: {timestamp}")
|
|
trade_type = "BUY"
|
|
|
|
# Convert timestamp to datetime if it's not already
|
|
if not isinstance(timestamp, datetime):
|
|
try:
|
|
if isinstance(timestamp, str):
|
|
timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
|
|
elif isinstance(timestamp, (int, float)):
|
|
timestamp = datetime.fromtimestamp(timestamp / 1000.0)
|
|
except Exception as e:
|
|
logger.error(f"Error converting timestamp for trade: {str(e)}")
|
|
timestamp = datetime.now()
|
|
|
|
# Create the trade object
|
|
trade = {
|
|
'price': price,
|
|
'timestamp': timestamp,
|
|
'pnl': pnl,
|
|
'amount': amount,
|
|
'action': trade_type
|
|
}
|
|
|
|
# Add to our trades list
|
|
if not hasattr(self, 'trades'):
|
|
self.trades = []
|
|
self.trades.append(trade)
|
|
|
|
# Log the trade for debugging
|
|
pnl_str = f" with PnL: {pnl}" if pnl is not None else ""
|
|
logger.info(f"Added trade: {trade_type} {amount} at price {price} at time {timestamp}{pnl_str}")
|
|
|
|
# Trigger a more frequent update of the chart by scheduling a callback
|
|
# This helps ensure the trade appears immediately on the chart
|
|
if hasattr(self, 'app') and self.app is not None:
|
|
try:
|
|
# Only update if we have a dash app running
|
|
# This is a workaround to make trades appear immediately
|
|
interval_component = self.app.get_asset_url('interval-component')
|
|
if interval_component:
|
|
self.app.callback_map[interval_component]['callback']()
|
|
except:
|
|
# If callback triggering fails, it's not critical
|
|
pass
|
|
|
|
return trade
|
|
|
|
def update_trading_info(self, signal=None, position=None, balance=None, pnl=None):
|
|
"""Update the current trading information to be displayed on the chart
|
|
|
|
Args:
|
|
signal: Current signal (BUY, SELL, HOLD)
|
|
position: Current position size
|
|
balance: Current session balance
|
|
pnl: Current session PnL
|
|
"""
|
|
if signal is not None:
|
|
if signal in ['BUY', 'SELL', 'HOLD']:
|
|
self.current_signal = signal
|
|
self.signal_time = datetime.now()
|
|
else:
|
|
logger.warning(f"Invalid signal type: {signal}")
|
|
|
|
if position is not None:
|
|
self.current_position = position
|
|
|
|
if balance is not None:
|
|
self.session_balance = balance
|
|
|
|
if pnl is not None:
|
|
self.session_pnl = pnl
|
|
|
|
logger.debug(f"Updated trading info: Signal={self.current_signal}, Position={self.current_position}, Balance=${self.session_balance:.2f}, PnL={self.session_pnl:.4f}")
|
|
|
|
async def main():
|
|
global charts # Make charts globally accessible for NN integration
|
|
symbols = ["ETH/USDT", "BTC/USDT"]
|
|
logger.info(f"Starting application for symbols: {symbols}")
|
|
|
|
# Initialize neural network if enabled
|
|
if NN_ENABLED:
|
|
logger.info("Initializing Neural Network integration...")
|
|
if setup_neural_network():
|
|
logger.info("Neural Network integration initialized successfully")
|
|
else:
|
|
logger.warning("Neural Network integration failed to initialize")
|
|
|
|
charts = []
|
|
websocket_tasks = []
|
|
|
|
# Create a chart and websocket task for each symbol
|
|
for symbol in symbols:
|
|
chart = RealTimeChart(symbol)
|
|
charts.append(chart)
|
|
websocket_tasks.append(asyncio.create_task(chart.start_websocket()))
|
|
|
|
# Run Dash in a separate thread to not block the event loop
|
|
server_threads = []
|
|
for i, chart in enumerate(charts):
|
|
port = 8050 + i # Use different ports for each chart
|
|
logger.info(f"Starting chart for {chart.symbol} on port {port}")
|
|
thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed
|
|
thread.daemon = True
|
|
thread.start()
|
|
server_threads.append(thread)
|
|
logger.info(f"Thread started for {chart.symbol} on port {port}")
|
|
|
|
try:
|
|
# Keep the main task running
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
except KeyboardInterrupt:
|
|
logger.info("Shutting down...")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error: {str(e)}")
|
|
finally:
|
|
for task in websocket_tasks:
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logger.info("Application terminated by user")
|