gogo2/realtime.py
2025-04-01 21:22:02 +03:00

1471 lines
58 KiB
Python

import asyncio
import json
import logging
from typing import Dict, List, Optional
import websockets
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import dash
from dash import html, dcc
from dash.dependencies import Input, Output
import pandas as pd
import numpy as np
from collections import deque
import time
from threading import Thread
import requests
import os
from datetime import datetime, timedelta
import pytz
import tzlocal
import threading
import random
import dash_bootstrap_components as dbc
import uuid
class BinanceHistoricalData:
"""
Class for fetching historical price data from Binance.
"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3"
self.cache_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cache')
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
def get_historical_candles(self, symbol, interval_seconds=3600, limit=1000):
"""
Fetch historical candles from Binance API.
Args:
symbol (str): Trading pair symbol (e.g., "BTC/USDT")
interval_seconds (int): Timeframe in seconds (e.g., 3600 for 1h)
limit (int): Number of candles to fetch
Returns:
pd.DataFrame: DataFrame with OHLCV data
"""
# Convert interval_seconds to Binance interval format
interval_map = {
1: "1s",
60: "1m",
300: "5m",
900: "15m",
1800: "30m",
3600: "1h",
14400: "4h",
86400: "1d"
}
interval = interval_map.get(interval_seconds, "1h")
# Format symbol for Binance API (remove slash)
formatted_symbol = symbol.replace("/", "")
# Check if we have cached data first
cache_file = self._get_cache_filename(formatted_symbol, interval)
cached_data = self._load_from_cache(formatted_symbol, interval)
if cached_data is not None and len(cached_data) >= limit:
logger.info(f"Using cached historical data for {symbol} ({interval})")
return cached_data
try:
# Build URL for klines endpoint
url = f"{self.base_url}/klines"
params = {
"symbol": formatted_symbol,
"interval": interval,
"limit": limit
}
# Make the request
response = requests.get(url, params=params)
response.raise_for_status()
# Parse the response
data = response.json()
# Create dataframe
df = pd.DataFrame(data, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert timestamp to datetime
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
# Convert price columns to float
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Sort by timestamp
df = df.sort_values("timestamp")
# Save to cache for future use
self._save_to_cache(df, formatted_symbol, interval)
logger.info(f"Fetched {len(df)} candles for {symbol} ({interval})")
return df
except Exception as e:
logger.error(f"Error fetching historical data from Binance: {str(e)}")
# Return cached data if we have it, even if it's not enough
if cached_data is not None:
logger.warning(f"Using cached data instead (may be incomplete)")
return cached_data
# Return empty dataframe on error
return pd.DataFrame()
def _get_cache_filename(self, symbol, interval):
"""Get filename for cache file"""
return os.path.join(self.cache_dir, f"{symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol, interval):
"""Load candles from cache file"""
try:
cache_file = self._get_cache_filename(symbol, interval)
if os.path.exists(cache_file):
df = pd.read_csv(cache_file)
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
except Exception as e:
logger.error(f"Error loading cached data: {str(e)}")
return None
def _save_to_cache(self, df, symbol, interval):
"""Save candles to cache file"""
try:
cache_file = self._get_cache_filename(symbol, interval)
df.to_csv(cache_file, index=False)
logger.info(f"Saved {len(df)} candles to cache: {cache_file}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_recent_trades(self, symbol, limit=1000):
"""Get recent trades for a symbol"""
formatted_symbol = symbol.replace("/", "")
try:
url = f"{self.base_url}/trades"
params = {
"symbol": formatted_symbol,
"limit": limit
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Create dataframe
df = pd.DataFrame(data)
df["time"] = pd.to_datetime(df["time"], unit="ms")
df["price"] = df["price"].astype(float)
df["qty"] = df["qty"].astype(float)
return df
except Exception as e:
logger.error(f"Error fetching recent trades: {str(e)}")
return pd.DataFrame()
# Configure logging with more detailed format
logging.basicConfig(
level=logging.INFO, # Changed to DEBUG for more detailed logs
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('realtime_chart.log')
]
)
logger = logging.getLogger(__name__)
# Neural Network integration (conditional import)
NN_ENABLED = os.environ.get('ENABLE_NN_MODELS', '0') == '1'
nn_orchestrator = None
nn_inference_thread = None
if NN_ENABLED:
try:
import sys
# Add project root to sys.path if needed
project_root = os.path.dirname(os.path.abspath(__file__))
if project_root not in sys.path:
sys.path.append(project_root)
from NN.main import NeuralNetworkOrchestrator
logger.info("Neural Network module enabled")
except ImportError as e:
logger.warning(f"Failed to import Neural Network module, disabling NN features: {str(e)}")
NN_ENABLED = False
# NN utility functions
def setup_neural_network():
"""Initialize the neural network components if enabled"""
global nn_orchestrator, NN_ENABLED
if not NN_ENABLED:
return False
try:
# Get configuration from environment variables or use defaults
symbol = os.environ.get('NN_SYMBOL', 'ETH/USDT')
timeframes = os.environ.get('NN_TIMEFRAMES', '1m,5m,1h,4h,1d').split(',')
output_size = int(os.environ.get('NN_OUTPUT_SIZE', '3')) # 3 for BUY/HOLD/SELL
# Configure the orchestrator
config = {
'symbol': symbol,
'timeframes': timeframes,
'window_size': int(os.environ.get('NN_WINDOW_SIZE', '20')),
'n_features': 5, # OHLCV
'output_size': output_size,
'model_dir': 'NN/models/saved',
'data_dir': 'NN/data'
}
# Initialize the orchestrator
logger.info(f"Initializing Neural Network Orchestrator with config: {config}")
nn_orchestrator = NeuralNetworkOrchestrator(config)
# Start inference thread if enabled
inference_interval = int(os.environ.get('NN_INFERENCE_INTERVAL', '60'))
if inference_interval > 0:
start_nn_inference_thread(inference_interval)
return True
except Exception as e:
logger.error(f"Error setting up neural network: {str(e)}")
import traceback
logger.error(traceback.format_exc())
NN_ENABLED = False
return False
def start_nn_inference_thread(interval_seconds):
"""Start a background thread to periodically run inference with the neural network"""
global nn_inference_thread
if not NN_ENABLED or nn_orchestrator is None:
logger.warning("Cannot start inference thread - Neural Network not enabled or initialized")
return False
def inference_worker():
"""Worker function for the inference thread"""
model_type = os.environ.get('NN_MODEL_TYPE', 'cnn')
timeframe = os.environ.get('NN_TIMEFRAME', '1h')
logger.info(f"Starting neural network inference thread with {interval_seconds}s interval")
logger.info(f"Using model type: {model_type}, timeframe: {timeframe}")
# Wait a bit for charts to initialize
time.sleep(5)
# Track active charts
active_charts = []
while True:
try:
# Find active charts if we don't have them yet
if not active_charts and 'charts' in globals():
active_charts = globals()['charts']
logger.info(f"Found {len(active_charts)} active charts for NN signals")
# Run inference
result = nn_orchestrator.run_inference_pipeline(
model_type=model_type,
timeframe=timeframe
)
if result:
# Log the result
logger.info(f"Neural network inference result: {result}")
# Add signal to charts
if active_charts:
try:
if 'action' in result:
action = result['action']
timestamp = datetime.fromisoformat(result['timestamp'].replace('Z', '+00:00'))
# Get probability if available
probability = None
if 'probability' in result:
probability = result['probability']
elif 'probabilities' in result:
probability = result['probabilities'].get(action, None)
# Add signal to each chart
for chart in active_charts:
if hasattr(chart, 'add_nn_signal'):
chart.add_nn_signal(action, timestamp, probability)
except Exception as e:
logger.error(f"Error adding NN signal to chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Sleep for the interval
time.sleep(interval_seconds)
except Exception as e:
logger.error(f"Error in inference thread: {str(e)}")
import traceback
logger.error(traceback.format_exc())
time.sleep(5) # Wait a bit before retrying
# Create and start the thread
nn_inference_thread = threading.Thread(target=inference_worker, daemon=True)
nn_inference_thread.start()
return True
# Try to get local timezone, default to Sofia/EET if not available
try:
local_timezone = tzlocal.get_localzone()
# Get timezone name safely
try:
tz_name = str(local_timezone)
# Handle case where it might be zoneinfo.ZoneInfo object instead of pytz timezone
if hasattr(local_timezone, 'zone'):
tz_name = local_timezone.zone
elif hasattr(local_timezone, 'key'):
tz_name = local_timezone.key
else:
tz_name = str(local_timezone)
except:
tz_name = "Local"
logger.info(f"Detected local timezone: {local_timezone} ({tz_name})")
except Exception as e:
logger.warning(f"Could not detect local timezone: {str(e)}. Defaulting to Sofia/EET")
local_timezone = pytz.timezone('Europe/Sofia')
tz_name = "Europe/Sofia"
def convert_to_local_time(timestamp):
"""Convert timestamp to local timezone"""
try:
if isinstance(timestamp, pd.Timestamp):
dt = timestamp.to_pydatetime()
elif isinstance(timestamp, np.datetime64):
dt = pd.Timestamp(timestamp).to_pydatetime()
elif isinstance(timestamp, str):
dt = pd.to_datetime(timestamp).to_pydatetime()
else:
dt = timestamp
# If datetime is naive (no timezone), assume it's UTC
if dt.tzinfo is None:
dt = dt.replace(tzinfo=pytz.UTC)
# Convert to local timezone
local_dt = dt.astimezone(local_timezone)
return local_dt
except Exception as e:
logger.error(f"Error converting timestamp to local time: {str(e)}")
return timestamp
class TickStorage:
"""Simple storage for ticks and candles"""
def __init__(self):
self.ticks = []
self.candles = {} # Organized by timeframe key (e.g., '1s', '1m', '1h')
self.latest_price = None
self.last_update = datetime.now()
# Initialize empty candle arrays for different timeframes
for timeframe in ['1s', '5s', '15s', '30s', '1m', '5m', '15m', '30m', '1h', '4h', '1d']:
self.candles[timeframe] = []
def add_tick(self, price, volume=0, timestamp=None):
"""Add a tick to the storage"""
if timestamp is None:
timestamp = datetime.now()
# Ensure timestamp is datetime
if isinstance(timestamp, (int, float)):
timestamp = datetime.fromtimestamp(timestamp)
tick = {
'price': price,
'volume': volume,
'timestamp': timestamp
}
self.ticks.append(tick)
self.latest_price = price
self.last_update = datetime.now()
# Keep only last 10000 ticks
if len(self.ticks) > 10000:
self.ticks = self.ticks[-10000:]
# Update all timeframe candles
self._update_all_candles(tick)
def get_latest_price(self):
"""Get the latest price"""
return self.latest_price
def _update_all_candles(self, tick):
"""Update all candle timeframes with the new tick"""
# Define intervals in seconds
intervals = {
'1s': 1,
'5s': 5,
'15s': 15,
'30s': 30,
'1m': 60,
'5m': 300,
'15m': 900,
'30m': 1800,
'1h': 3600,
'4h': 14400,
'1d': 86400
}
# Update each timeframe
for interval_key, seconds in intervals.items():
self._update_candles_for_timeframe(interval_key, seconds, tick)
def _update_candles_for_timeframe(self, interval_key, interval_seconds, tick):
"""Update candles for a specific timeframe"""
# Get or create the current candle
current_candle = self._get_current_candle(interval_key, tick['timestamp'], interval_seconds)
# If this is a new candle, initialize it with the tick price
if current_candle['open'] == 0.0:
current_candle['open'] = tick['price']
current_candle['high'] = tick['price']
current_candle['low'] = tick['price']
# Update the candle with the new tick
if current_candle['high'] < tick['price']:
current_candle['high'] = tick['price']
if current_candle['low'] > tick['price'] or current_candle['low'] == 0:
current_candle['low'] = tick['price']
current_candle['close'] = tick['price']
current_candle['volume'] += tick['volume']
# Limit the number of candles to keep for each timeframe
# Keep more candles for shorter timeframes, fewer for longer ones
max_candles = {
'1s': 1000, # ~16 minutes of 1s data
'5s': 1000, # ~83 minutes of 5s data
'15s': 800, # ~3.3 hours of 15s data
'30s': 600, # ~5 hours of 30s data
'1m': 500, # ~8 hours of 1m data
'5m': 300, # ~25 hours of 5m data
'15m': 200, # ~50 hours of 15m data
'30m': 150, # ~3 days of 30m data
'1h': 168, # 7 days of 1h data
'4h': 90, # ~15 days of 4h data
'1d': 365 # 1 year of daily data
}
# Trim candles list if needed
if len(self.candles[interval_key]) > max_candles.get(interval_key, 500):
self.candles[interval_key] = self.candles[interval_key][-max_candles.get(interval_key, 500):]
def _get_current_candle(self, interval_key, timestamp, interval_seconds):
"""Get the current candle for the given interval, or create a new one"""
# Calculate the candle start time based on the timeframe
candle_start = self._calculate_candle_start(timestamp, interval_seconds)
# Check if we already have a candle for this time
for candle in self.candles[interval_key]:
if candle['timestamp'] == candle_start:
return candle
# Create a new candle
candle = {
'timestamp': candle_start,
'open': 0.0,
'high': 0.0,
'low': float('inf'),
'close': 0.0,
'volume': 0
}
self.candles[interval_key].append(candle)
return candle
def _calculate_candle_start(self, timestamp, interval_seconds):
"""Calculate the start time of a candle based on interval"""
# Seconds timeframes (1s, 5s, 15s, 30s)
if interval_seconds < 60:
# Round down to the nearest multiple of interval_seconds
seconds_since_hour = timestamp.second + timestamp.minute * 60
candle_seconds = (seconds_since_hour // interval_seconds) * interval_seconds
candle_minute = candle_seconds // 60
candle_second = candle_seconds % 60
return timestamp.replace(
microsecond=0,
second=candle_second,
minute=candle_minute
)
# Minute timeframes (1m, 5m, 15m, 30m)
elif interval_seconds < 3600:
minutes_in_interval = interval_seconds // 60
return timestamp.replace(
microsecond=0,
second=0,
minute=(timestamp.minute // minutes_in_interval) * minutes_in_interval
)
# Hour timeframes (1h, 4h)
elif interval_seconds < 86400:
hours_in_interval = interval_seconds // 3600
return timestamp.replace(
microsecond=0,
second=0,
minute=0,
hour=(timestamp.hour // hours_in_interval) * hours_in_interval
)
# Day timeframe (1d)
else:
return timestamp.replace(
microsecond=0,
second=0,
minute=0,
hour=0
)
def get_candles(self, interval='1m'):
"""Get candles for the specified interval"""
# Convert legacy interval format to new format
if isinstance(interval, int):
# Convert seconds to the appropriate key
if interval < 60:
interval_key = f"{interval}s"
elif interval < 3600:
interval_key = f"{interval // 60}m"
elif interval < 86400:
interval_key = f"{interval // 3600}h"
else:
interval_key = f"{interval // 86400}d"
else:
interval_key = interval
# Ensure the interval key exists in our candles dict
if interval_key not in self.candles:
logger.warning(f"Invalid interval key: {interval_key}")
return None
if not self.candles[interval_key]:
logger.warning(f"No candles available for {interval_key}")
return None
# Convert to DataFrame
df = pd.DataFrame(self.candles[interval_key])
if df.empty:
return None
# Set timestamp as index
df.set_index('timestamp', inplace=True)
# Sort by timestamp
df = df.sort_index()
return df
def load_from_file(self, file_path):
"""Load ticks from a file"""
try:
df = pd.read_csv(file_path)
for _, row in df.iterrows():
if 'timestamp' in row:
timestamp = pd.to_datetime(row['timestamp'])
else:
timestamp = None
self.add_tick(
price=row.get('price', row.get('close', 0)),
volume=row.get('volume', 0),
timestamp=timestamp
)
logger.info(f"Loaded {len(df)} ticks from {file_path}")
except Exception as e:
logger.error(f"Error loading ticks from file: {str(e)}")
def load_historical_data(self, historical_data, symbol):
"""Load historical data"""
try:
# Load data for different timeframes
timeframes = [
(1, '1s'), # 1 second
(60, '1m'), # 1 minute
(300, '5m'), # 5 minutes
(900, '15m'), # 15 minutes
(3600, '1h'), # 1 hour
(14400, '4h'), # 4 hours
(86400, '1d') # 1 day
]
for interval_seconds, interval_key in timeframes:
# Set appropriate limits based on timeframe
limit = 1000 # Default
if interval_seconds == 1:
limit = 500 # 1s is too much data, limit to 500
elif interval_seconds < 60:
limit = 750 # For seconds-level data
elif interval_seconds < 300:
limit = 1000 # 1m
elif interval_seconds < 900:
limit = 500 # 5m
elif interval_seconds < 3600:
limit = 300 # 15m
else:
limit = 200 # hourly/daily data
df = historical_data.get_historical_candles(symbol, interval_seconds, limit)
if df is not None and not df.empty:
logger.info(f"Loaded {len(df)} historical candles for {symbol} ({interval_key})")
# Convert to our candle format and store
for _, row in df.iterrows():
candle = {
'timestamp': row['timestamp'],
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume']
}
self.candles[interval_key].append(candle)
# For 1m and above, also use the close price to simulate ticks
# but don't do this for seconds-level data as it creates too many ticks
if interval_seconds >= 60 and interval_key == '1m':
self.add_tick(
price=row['close'],
volume=row['volume'],
timestamp=row['timestamp']
)
# Update latest price from most recent candle
if len(df) > 0:
self.latest_price = df.iloc[-1]['close']
logger.info(f"Completed loading historical data for {symbol}")
except Exception as e:
logger.error(f"Error loading historical data: {str(e)}")
import traceback
logger.error(traceback.format_exc())
class Position:
"""Represents a trading position"""
def __init__(self, action, entry_price, amount, timestamp=None, trade_id=None, fee_rate=0.001):
self.action = action
self.entry_price = entry_price
self.amount = amount
self.entry_timestamp = timestamp or datetime.now()
self.exit_timestamp = None
self.exit_price = None
self.pnl = None
self.is_open = True
self.trade_id = trade_id or str(uuid.uuid4())[:8]
self.fee_rate = fee_rate
self.paid_fee = entry_price * amount * fee_rate # Calculate entry fee
def close(self, exit_price, exit_timestamp=None):
"""Close an open position"""
self.exit_price = exit_price
self.exit_timestamp = exit_timestamp or datetime.now()
self.is_open = False
# Calculate P&L
if self.action == "BUY":
price_diff = self.exit_price - self.entry_price
# Calculate fee for exit trade
exit_fee = exit_price * self.amount * self.fee_rate
self.paid_fee += exit_fee # Add exit fee to total paid fee
self.pnl = (price_diff * self.amount) - self.paid_fee
else: # SELL
price_diff = self.entry_price - self.exit_price
# Calculate fee for exit trade
exit_fee = exit_price * self.amount * self.fee_rate
self.paid_fee += exit_fee # Add exit fee to total paid fee
self.pnl = (price_diff * self.amount) - self.paid_fee
return self.pnl
class RealTimeChart:
"""Real-time chart using Dash and Plotly"""
def __init__(self, symbol: str):
"""Initialize the chart with a symbol"""
self.symbol = symbol
self.tick_storage = TickStorage()
self.latest_price = None
self.latest_volume = None
self.latest_timestamp = None
self.positions = [] # List to store positions
self.accumulative_pnl = 0.0 # Track total PnL
self.current_balance = 100.0 # Start with $100 balance
# Store historical data for different timeframes
self.timeframe_data = {
'1s': [],
'5s': [],
'15s': [],
'1m': [],
'5m': [],
'15m': [],
'1h': [],
'4h': [],
'1d': []
}
# Initialize Dash app
self.app = dash.Dash(__name__, external_stylesheets=[dbc.themes.DARKLY])
# Define button styles
self.button_style = {
'padding': '5px 10px',
'margin': '0 5px',
'backgroundColor': '#444',
'color': 'white',
'border': 'none',
'borderRadius': '5px',
'cursor': 'pointer'
}
self.active_button_style = {
'padding': '5px 10px',
'margin': '0 5px',
'backgroundColor': '#007bff',
'color': 'white',
'border': 'none',
'borderRadius': '5px',
'cursor': 'pointer',
'boxShadow': '0 0 5px rgba(0, 123, 255, 0.5)'
}
# Create the layout
self.app.layout = html.Div([
# Header section with title and current price
html.Div([
html.H1(f"{symbol} Real-Time Chart", className="display-4"),
# Current price ticker
html.Div([
html.H4("Current Price:", style={"display": "inline-block", "marginRight": "10px"}),
html.H3(id="current-price", style={"display": "inline-block", "color": "#17a2b8"}),
html.Div([
html.H5("Balance:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.H5(id="current-balance", style={"display": "inline-block", "color": "#28a745"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
html.Div([
html.H5("Accumulated PnL:", style={"display": "inline-block", "marginRight": "10px", "marginLeft": "30px"}),
html.H5(id="accumulated-pnl", style={"display": "inline-block", "color": "#ffc107"}),
], style={"display": "inline-block", "marginLeft": "40px"}),
], style={"textAlign": "center", "margin": "20px 0"}),
], style={"textAlign": "center", "marginBottom": "20px"}),
# Add interval component for periodic updates
dcc.Interval(
id='interval-component',
interval=500, # in milliseconds
n_intervals=0
),
# Add timeframe selection buttons
html.Div([
html.Button('1s', id='btn-1s', n_clicks=0, style=self.active_button_style),
html.Button('5s', id='btn-5s', n_clicks=0, style=self.button_style),
html.Button('15s', id='btn-15s', n_clicks=0, style=self.button_style),
html.Button('1m', id='btn-1m', n_clicks=0, style=self.button_style),
html.Button('5m', id='btn-5m', n_clicks=0, style=self.button_style),
html.Button('15m', id='btn-15m', n_clicks=0, style=self.button_style),
html.Button('1h', id='btn-1h', n_clicks=0, style=self.button_style),
], style={"textAlign": "center", "marginBottom": "20px"}),
# Store for the selected timeframe
dcc.Store(id='interval-store', data={'interval': 1}),
# Chart containers
dcc.Graph(id='live-chart', style={"height": "600px"}),
dcc.Graph(id='secondary-charts', style={"height": "500px"}),
# Positions list container
html.Div(id='positions-list')
])
# Setup callbacks
self._setup_callbacks()
def _setup_callbacks(self):
"""Set up all the callbacks for the dashboard"""
# Callback for timeframe selection
@self.app.callback(
[Output('interval-store', 'data'),
Output('btn-1s', 'style'),
Output('btn-5s', 'style'),
Output('btn-15s', 'style'),
Output('btn-1m', 'style'),
Output('btn-5m', 'style'),
Output('btn-15m', 'style'),
Output('btn-1h', 'style')],
[Input('btn-1s', 'n_clicks'),
Input('btn-5s', 'n_clicks'),
Input('btn-15s', 'n_clicks'),
Input('btn-1m', 'n_clicks'),
Input('btn-5m', 'n_clicks'),
Input('btn-15m', 'n_clicks'),
Input('btn-1h', 'n_clicks')],
[dash.dependencies.State('interval-store', 'data')]
)
def update_interval(n1, n5, n15, n60, n300, n900, n3600, data):
ctx = dash.callback_context
if not ctx.triggered:
# Default state (1s selected)
return ({'interval': 1},
self.active_button_style,
self.button_style,
self.button_style,
self.button_style,
self.button_style,
self.button_style,
self.button_style)
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
# Initialize all buttons to inactive
button_styles = [self.button_style] * 7
# Set the active button and interval
if button_id == 'btn-1s':
button_styles[0] = self.active_button_style
return ({'interval': 1}, *button_styles)
elif button_id == 'btn-5s':
button_styles[1] = self.active_button_style
return ({'interval': 5}, *button_styles)
elif button_id == 'btn-15s':
button_styles[2] = self.active_button_style
return ({'interval': 15}, *button_styles)
elif button_id == 'btn-1m':
button_styles[3] = self.active_button_style
return ({'interval': 60}, *button_styles)
elif button_id == 'btn-5m':
button_styles[4] = self.active_button_style
return ({'interval': 300}, *button_styles)
elif button_id == 'btn-15m':
button_styles[5] = self.active_button_style
return ({'interval': 900}, *button_styles)
elif button_id == 'btn-1h':
button_styles[6] = self.active_button_style
return ({'interval': 3600}, *button_styles)
# Default - keep current interval
current_interval = data.get('interval', 1)
# Set the appropriate button as active
if current_interval == 1:
button_styles[0] = self.active_button_style
elif current_interval == 5:
button_styles[1] = self.active_button_style
elif current_interval == 15:
button_styles[2] = self.active_button_style
elif current_interval == 60:
button_styles[3] = self.active_button_style
elif current_interval == 300:
button_styles[4] = self.active_button_style
elif current_interval == 900:
button_styles[5] = self.active_button_style
elif current_interval == 3600:
button_styles[6] = self.active_button_style
return (data, *button_styles)
# Main update callback
@self.app.callback(
[Output('live-chart', 'figure'),
Output('secondary-charts', 'figure'),
Output('positions-list', 'children'),
Output('current-price', 'children'),
Output('current-balance', 'children'),
Output('accumulated-pnl', 'children')],
[Input('interval-component', 'n_intervals'),
Input('interval-store', 'data')]
)
def update_all(n, interval_data):
try:
# Get selected interval
interval = interval_data.get('interval', 1)
# Get updated chart figures
main_fig = self._update_main_chart(interval)
secondary_fig = self._update_secondary_charts()
# Get updated positions list
positions = self._get_position_list_rows()
# Format the current price
current_price = "$ ---.--"
if self.latest_price is not None:
current_price = f"${self.latest_price:.2f}"
# Format balance and PnL
balance_text = f"${self.current_balance:.2f}"
pnl_text = f"${self.accumulative_pnl:.2f}"
return main_fig, secondary_fig, positions, current_price, balance_text, pnl_text
except Exception as e:
logger.error(f"Error in update callback: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Return empty updates on error
return {}, {}, [], "Error", "$0.00", "$0.00"
def _update_main_chart(self, interval=1):
"""Update the main chart with the selected timeframe"""
try:
# Get candle data for the selected interval
candles = self.get_candles(interval_seconds=interval)
if not candles or len(candles) == 0:
# Return empty chart if no data
return go.Figure()
# Create the candlestick chart
fig = go.Figure()
# Add candlestick trace
fig.add_trace(go.Candlestick(
x=[c['timestamp'] for c in candles],
open=[c['open'] for c in candles],
high=[c['high'] for c in candles],
low=[c['low'] for c in candles],
close=[c['close'] for c in candles],
name='Price'
))
# Add volume as a bar chart below
fig.add_trace(go.Bar(
x=[c['timestamp'] for c in candles],
y=[c['volume'] for c in candles],
name='Volume',
marker=dict(
color='rgba(0, 0, 255, 0.5)',
),
opacity=0.5,
yaxis='y2'
))
# Add buy/sell markers for trades
if hasattr(self, 'positions') and self.positions:
buy_times = []
buy_prices = []
sell_times = []
sell_prices = []
# Use all positions for chart display
# Filter recent ones based on visible time range
now = datetime.now()
time_limit = now - timedelta(hours=24) # Show at most 24h of trades
for position in self.positions:
if position.entry_timestamp > time_limit:
if position.action == "BUY":
buy_times.append(position.entry_timestamp)
buy_prices.append(position.entry_price)
elif position.action == "SELL" and position.exit_timestamp:
sell_times.append(position.exit_timestamp)
sell_prices.append(position.exit_price)
# Add buy markers (green triangles pointing up)
if buy_times:
fig.add_trace(go.Scatter(
x=buy_times,
y=buy_prices,
mode='markers',
name='Buy',
marker=dict(
symbol='triangle-up',
size=10,
color='green',
line=dict(width=1, color='black')
)
))
# Add sell markers (red triangles pointing down)
if sell_times:
fig.add_trace(go.Scatter(
x=sell_times,
y=sell_prices,
mode='markers',
name='Sell',
marker=dict(
symbol='triangle-down',
size=10,
color='red',
line=dict(width=1, color='black')
)
))
# Update layout
timeframe_label = f"{interval}s" if interval < 60 else f"{interval//60}m" if interval < 3600 else f"{interval//3600}h"
fig.update_layout(
title=f'{self.symbol} Price ({timeframe_label})',
xaxis_title='Time',
yaxis_title='Price',
template='plotly_dark',
xaxis_rangeslider_visible=False,
height=600,
hovermode='x unified',
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
yaxis=dict(
domain=[0.25, 1]
),
yaxis2=dict(
domain=[0, 0.2],
title='Volume'
),
)
# Add timestamp to show when chart was last updated
fig.add_annotation(
text=f"Last updated: {datetime.now().strftime('%H:%M:%S')}",
xref="paper", yref="paper",
x=0.98, y=0.01,
showarrow=False,
font=dict(size=10, color="gray")
)
return fig
except Exception as e:
logger.error(f"Error updating main chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return go.Figure() # Return empty figure on error
def _update_secondary_charts(self):
"""Create secondary charts with multiple timeframes (1m, 1h, 1d)"""
try:
# Create subplot with 3 rows
fig = make_subplots(
rows=3, cols=1,
shared_xaxes=False,
vertical_spacing=0.05,
subplot_titles=('1 Minute', '1 Hour', '1 Day')
)
# Get data for each timeframe
candles_1m = self.get_candles(interval_seconds=60)
candles_1h = self.get_candles(interval_seconds=3600)
candles_1d = self.get_candles(interval_seconds=86400)
# 1-minute chart (row 1)
if candles_1m and len(candles_1m) > 0:
fig.add_trace(go.Candlestick(
x=[c['timestamp'] for c in candles_1m],
open=[c['open'] for c in candles_1m],
high=[c['high'] for c in candles_1m],
low=[c['low'] for c in candles_1m],
close=[c['close'] for c in candles_1m],
name='1m Price',
showlegend=False
), row=1, col=1)
# 1-hour chart (row 2)
if candles_1h and len(candles_1h) > 0:
fig.add_trace(go.Candlestick(
x=[c['timestamp'] for c in candles_1h],
open=[c['open'] for c in candles_1h],
high=[c['high'] for c in candles_1h],
low=[c['low'] for c in candles_1h],
close=[c['close'] for c in candles_1h],
name='1h Price',
showlegend=False
), row=2, col=1)
# 1-day chart (row 3)
if candles_1d and len(candles_1d) > 0:
fig.add_trace(go.Candlestick(
x=[c['timestamp'] for c in candles_1d],
open=[c['open'] for c in candles_1d],
high=[c['high'] for c in candles_1d],
low=[c['low'] for c in candles_1d],
close=[c['close'] for c in candles_1d],
name='1d Price',
showlegend=False
), row=3, col=1)
# Update layout
fig.update_layout(
height=500,
template='plotly_dark',
margin=dict(l=50, r=50, t=30, b=30),
showlegend=False,
hovermode='x unified'
)
# Disable rangesliders for cleaner look
fig.update_xaxes(rangeslider_visible=False)
return fig
except Exception as e:
logger.error(f"Error updating secondary charts: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return go.Figure() # Return empty figure on error
def _get_position_list_rows(self):
"""Generate HTML for the positions list (last 10 positions only)"""
try:
if not hasattr(self, 'positions') or not self.positions:
# Return placeholder if no positions
return html.Div("No positions to display", style={"textAlign": "center", "padding": "20px"})
# Create table headers
table_header = [
html.Thead(html.Tr([
html.Th("ID"),
html.Th("Action"),
html.Th("Entry Price"),
html.Th("Exit Price"),
html.Th("Amount"),
html.Th("PnL"),
html.Th("Time")
]))
]
# Create table rows for only the last 10 positions to avoid overcrowding
rows = []
last_positions = self.positions[-10:] if len(self.positions) > 10 else self.positions
for position in last_positions:
# Format times
entry_time = position.entry_timestamp.strftime("%H:%M:%S")
exit_time = position.exit_timestamp.strftime("%H:%M:%S") if position.exit_timestamp else "-"
# Format PnL
pnl_value = position.pnl if position.pnl is not None else 0
pnl_text = f"${pnl_value:.2f}" if position.pnl is not None else "-"
pnl_style = {"color": "green" if position.pnl and position.pnl > 0 else "red"}
# Create row
row = html.Tr([
html.Td(position.trade_id),
html.Td(position.action),
html.Td(f"${position.entry_price:.2f}"),
html.Td(f"${position.exit_price:.2f}" if position.exit_price else "-"),
html.Td(f"{position.amount:.4f}"),
html.Td(pnl_text, style=pnl_style),
html.Td(f"{entry_time}{exit_time}")
])
rows.append(row)
table_body = [html.Tbody(rows)]
# Add summary row for total PnL and other statistics
total_trades = len(self.positions)
winning_trades = sum(1 for p in self.positions if p.pnl and p.pnl > 0)
win_rate = winning_trades / total_trades * 100 if total_trades > 0 else 0
# Format display colors for PnL
pnl_color = "green" if self.accumulative_pnl >= 0 else "red"
summary_row = html.Tr([
html.Td("SUMMARY", colSpan=2, style={"fontWeight": "bold"}),
html.Td(f"Trades: {total_trades}"),
html.Td(f"Win Rate: {win_rate:.1f}%"),
html.Td("Total PnL:", style={"fontWeight": "bold"}),
html.Td(f"${self.accumulative_pnl:.2f}",
style={"color": pnl_color, "fontWeight": "bold"}),
html.Td(f"Balance: ${self.current_balance:.2f}")
], style={"backgroundColor": "rgba(80, 80, 80, 0.3)"})
# Create the table with improved styling
table = html.Table(
table_header + table_body + [html.Tfoot([summary_row])],
style={
"width": "100%",
"textAlign": "center",
"borderCollapse": "collapse",
"marginTop": "20px"
},
className="table table-striped table-dark"
)
return table
except Exception as e:
logger.error(f"Error generating position list: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return html.Div("Error displaying positions")
def get_candles(self, interval_seconds=60):
"""Get candles for the specified interval"""
try:
# Get candles from tick storage
interval_key = self._get_interval_key(interval_seconds)
df = self.tick_storage.get_candles(interval_key)
if df is None or df.empty:
logger.warning(f"No candle data available for {interval_key}")
return [] # Return empty list if no data
# Convert dataframe to list of dictionaries
candles = []
for idx, row in df.iterrows():
candle = {
'timestamp': idx,
'open': row['open'],
'high': row['high'],
'low': row['low'],
'close': row['close'],
'volume': row['volume']
}
candles.append(candle)
return candles
except Exception as e:
logger.error(f"Error getting candles: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return [] # Return empty list on error
def _get_interval_key(self, interval_seconds):
"""Convert interval seconds to a key used in the tick storage"""
if interval_seconds < 60:
return f"{interval_seconds}s"
elif interval_seconds < 3600:
return f"{interval_seconds // 60}m"
elif interval_seconds < 86400:
return f"{interval_seconds // 3600}h"
else:
return f"{interval_seconds // 86400}d"
async def start_websocket(self):
"""Start the websocket connection for real-time data"""
try:
# Initialize websocket
self.websocket = ExchangeWebSocket(self.symbol)
await self.websocket.connect()
logger.info(f"WebSocket connected for {self.symbol}")
# Start receiving data
while self.websocket.running:
try:
data = await self.websocket.receive()
if data:
# Process the received data
if 'price' in data:
# Update tick storage
self.tick_storage.add_tick(
price=data['price'],
volume=data.get('volume', 0),
timestamp=datetime.fromtimestamp(data['timestamp'] / 1000) # Convert ms to datetime
)
# Store latest values
self.latest_price = data['price']
self.latest_volume = data.get('volume', 0)
self.latest_timestamp = datetime.fromtimestamp(data['timestamp'] / 1000)
# Log occasional price updates (every 500 messages)
if hasattr(self.websocket.ws, 'message_count') and self.websocket.ws.message_count % 500 == 0:
logger.info(f"Current {self.symbol} price: ${self.latest_price:.2f}")
except Exception as e:
logger.error(f"Error processing websocket data: {str(e)}")
await asyncio.sleep(1) # Wait before retrying
except Exception as e:
logger.error(f"WebSocket error for {self.symbol}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
if hasattr(self, 'websocket'):
await self.websocket.close()
def run(self, host='localhost', port=8050):
"""Run the Dash app on the specified host and port"""
try:
logger.info(f"Starting Dash app for {self.symbol} on {host}:{port}")
self.app.run(debug=False, use_reloader=False, host=host, port=port)
except Exception as e:
logger.error(f"Error running Dash app: {str(e)}")
import traceback
logger.error(traceback.format_exc())
class BinanceWebSocket:
"""Binance WebSocket implementation for real-time tick data"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').lower()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.message_count = 0
# Binance WebSocket configuration
self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade"
logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}")
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 100 == 0: # Log every 100th message to avoid spam
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
data = json.loads(message)
# Process trade data
if 'e' in data and data['e'] == 'trade':
trade_data = {
'timestamp': data['T'], # Trade time
'price': float(data['p']), # Price
'volume': float(data['q']), # Quantity
'type': 'trade'
}
logger.debug(f"Processed trade data: {trade_data}")
return trade_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.ws.close()
class ExchangeWebSocket:
"""Generic WebSocket interface for cryptocurrency exchanges"""
def __init__(self, symbol: str, exchange: str = "binance"):
self.symbol = symbol
self.exchange = exchange.lower()
self.ws = None
# Initialize the appropriate WebSocket implementation
if self.exchange == "binance":
self.ws = BinanceWebSocket(symbol)
else:
raise ValueError(f"Unsupported exchange: {exchange}")
async def connect(self):
"""Connect to the exchange WebSocket"""
return await self.ws.connect()
async def receive(self) -> Optional[Dict]:
"""Receive data from the WebSocket"""
return await self.ws.receive()
async def close(self):
"""Close the WebSocket connection"""
await self.ws.close()
@property
def running(self):
"""Check if the WebSocket is running"""
return self.ws.running if self.ws else False
async def main():
global charts # Make charts globally accessible for NN integration
symbols = ["ETH/USDT", "ETH/USDT"]
logger.info(f"Starting application for symbols: {symbols}")
# Initialize neural network if enabled
if NN_ENABLED:
logger.info("Initializing Neural Network integration...")
if setup_neural_network():
logger.info("Neural Network integration initialized successfully")
else:
logger.warning("Neural Network integration failed to initialize")
charts = []
websocket_tasks = []
# Create a chart and websocket task for each symbol
for symbol in symbols:
chart = RealTimeChart(symbol)
charts.append(chart)
websocket_tasks.append(asyncio.create_task(chart.start_websocket()))
# Run Dash in a separate thread to not block the event loop
server_threads = []
for i, chart in enumerate(charts):
port = 8050 + i # Use different ports for each chart
logger.info(f"Starting chart for {chart.symbol} on port {port}")
thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed
thread.daemon = True
thread.start()
server_threads.append(thread)
logger.info(f"Thread started for {chart.symbol} on port {port}")
try:
# Keep the main task running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
finally:
for task in websocket_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application terminated by user")