gogo2/realtime.py
2025-03-19 00:06:48 +02:00

2087 lines
87 KiB
Python

import asyncio
import json
import logging
import datetime
from typing import Dict, List, Optional
import websockets
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import dash
from dash import html, dcc
from dash.dependencies import Input, Output
import pandas as pd
import numpy as np
from collections import deque
import time
from threading import Thread
import requests
import os
from datetime import datetime, timedelta
# Configure logging with more detailed format
logging.basicConfig(
level=logging.DEBUG, # Changed to DEBUG for more detailed logs
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('realtime_chart.log')
]
)
logger = logging.getLogger(__name__)
class TradeTickStorage:
"""Store and manage raw trade ticks for display and candle formation"""
def __init__(self, max_age_seconds: int = 1800): # 30 minutes by default
self.ticks = []
self.max_age_seconds = max_age_seconds
self.last_cleanup_time = time.time()
# Adjust cleanup interval based on max_age_seconds (more time = less frequent cleanup)
self.cleanup_interval = min(max(10, max_age_seconds // 60), 60) # Between 10-60 seconds
self.last_tick = None
logger.info(f"Initialized TradeTickStorage with max age: {max_age_seconds} seconds, cleanup interval: {self.cleanup_interval} seconds")
def add_tick(self, tick: Dict):
"""Add a new trade tick to storage"""
self.ticks.append(tick)
self.last_tick = tick
logger.debug(f"Added tick: {tick}, total ticks: {len(self.ticks)}")
# Only clean up periodically rather than on every tick
current_time = time.time()
if current_time - self.last_cleanup_time > self.cleanup_interval:
self._cleanup()
self.last_cleanup_time = current_time
def _cleanup(self):
"""Remove ticks older than max_age_seconds"""
now = int(time.time() * 1000) # Current time in ms
cutoff = now - (self.max_age_seconds * 1000)
old_count = len(self.ticks)
self.ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff]
removed = old_count - len(self.ticks)
if removed > 0:
logger.debug(f"Cleaned up {removed} old ticks, remaining: {len(self.ticks)}")
def get_latest_price(self) -> Optional[float]:
"""Get the latest price from the most recent tick"""
if self.last_tick:
return self.last_tick.get('price')
elif self.ticks:
# If last_tick not available but ticks exist, use the last tick
self.last_tick = self.ticks[-1]
return self.last_tick.get('price')
return None
def get_price_stats(self) -> Dict:
"""Get stats about the prices in storage"""
if not self.ticks:
return {
'min': None,
'max': None,
'latest': None,
'count': 0,
'age_seconds': 0
}
prices = [tick['price'] for tick in self.ticks]
latest_timestamp = self.ticks[-1]['timestamp']
oldest_timestamp = self.ticks[0]['timestamp']
return {
'min': min(prices),
'max': max(prices),
'latest': prices[-1],
'count': len(prices),
'age_seconds': (latest_timestamp - oldest_timestamp) / 1000
}
def get_ticks_as_df(self) -> pd.DataFrame:
"""Return ticks as a DataFrame"""
if not self.ticks:
logger.warning("No ticks available for DataFrame conversion")
return pd.DataFrame()
# Ensure we have fresh data
self._cleanup()
df = pd.DataFrame(self.ticks)
if not df.empty:
logger.debug(f"Converting timestamps for {len(df)} ticks")
# Ensure timestamp column exists
if 'timestamp' not in df.columns:
logger.error("Tick data missing timestamp column")
return pd.DataFrame()
# Check timestamp datatype before conversion
sample_ts = df['timestamp'].iloc[0] if len(df) > 0 else None
logger.debug(f"Sample timestamp before conversion: {sample_ts}, type: {type(sample_ts)}")
# Convert timestamps to datetime
try:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.debug(f"Timestamps converted to datetime successfully")
if len(df) > 0:
logger.debug(f"Sample converted timestamp: {df['timestamp'].iloc[0]}")
except Exception as e:
logger.error(f"Error converting timestamps: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
return df
def get_candles(self, interval_seconds: int = 1, start_time_ms: int = None, end_time_ms: int = None) -> pd.DataFrame:
"""Convert ticks to OHLCV candles at specified interval with optional time range filtering
Args:
interval_seconds: The interval in seconds for each candle
start_time_ms: Optional start time in milliseconds for filtering
end_time_ms: Optional end time in milliseconds for filtering
Returns:
DataFrame with OHLCV candles
"""
if not self.ticks:
logger.warning("No ticks available for candle formation")
return pd.DataFrame()
# Ensure ticks are up to date
self._cleanup()
# Get ticks from specified time range if provided
if start_time_ms is not None or end_time_ms is not None:
logger.debug(f"Filtering ticks for time range from {start_time_ms} to {end_time_ms}")
filtered_ticks = self.get_ticks_from_time(start_time_ms, end_time_ms)
if not filtered_ticks:
logger.warning("No ticks in the specified time range")
return pd.DataFrame()
df = pd.DataFrame(filtered_ticks)
else:
# Use all available ticks
df = self.get_ticks_as_df()
if df.empty:
logger.warning("Tick DataFrame is empty after filtering/conversion")
return pd.DataFrame()
logger.info(f"Preparing to create candles from {len(df)} ticks")
try:
# Use timestamp column for resampling
df = df.set_index('timestamp')
# Create interval string for resampling - use 's' instead of deprecated 'S'
interval_str = f'{interval_seconds}s'
# Resample to create OHLCV candles
logger.debug(f"Resampling with interval: {interval_str}")
candles = df.resample(interval_str).agg({
'price': ['first', 'max', 'min', 'last'],
'volume': 'sum'
})
# Check if resampling produced any data
if candles.empty:
logger.warning("Resampling produced empty dataframe - check timestamp distribution")
# Show timestamp ranges to diagnose potential resampling issues
if not df.empty:
min_time = df.index.min()
max_time = df.index.max()
logger.info(f"Tick timestamp range: {min_time} to {max_time}")
return pd.DataFrame()
# Flatten MultiIndex columns
candles.columns = ['open', 'high', 'low', 'close', 'volume']
# Reset index to get timestamp as column
candles = candles.reset_index()
# Ensure no NaN values
candles = candles.dropna()
logger.debug(f"Generated {len(candles)} candles from {len(df)} ticks")
return candles
except Exception as e:
logger.error(f"Error in candle formation: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
def get_ticks_from_time(self, start_time_ms: int = None, end_time_ms: int = None) -> List[Dict]:
"""Get ticks within a specific time range
Args:
start_time_ms: Start time in milliseconds (None for no lower bound)
end_time_ms: End time in milliseconds (None for no upper bound)
Returns:
List of ticks within the time range
"""
if not self.ticks:
return []
# Ensure ticks are updated
self._cleanup()
# Apply time filters if specified
filtered_ticks = self.ticks
if start_time_ms is not None:
filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] >= start_time_ms]
if end_time_ms is not None:
filtered_ticks = [tick for tick in filtered_ticks if tick['timestamp'] <= end_time_ms]
logger.debug(f"Retrieved {len(filtered_ticks)} ticks from time range {start_time_ms} to {end_time_ms}")
return filtered_ticks
def get_time_based_stats(self) -> Dict:
"""Get statistics about the ticks organized by time periods
Returns:
Dictionary with statistics for different time periods
"""
if not self.ticks:
return {
'total_ticks': 0,
'periods': {}
}
# Ensure ticks are updated
self._cleanup()
now = int(time.time() * 1000) # Current time in ms
# Define time periods to analyze
periods = {
'1min': now - (60 * 1000),
'5min': now - (5 * 60 * 1000),
'15min': now - (15 * 60 * 1000),
'30min': now - (30 * 60 * 1000)
}
stats = {
'total_ticks': len(self.ticks),
'oldest_tick': self.ticks[0]['timestamp'] if self.ticks else None,
'newest_tick': self.ticks[-1]['timestamp'] if self.ticks else None,
'time_span_seconds': (self.ticks[-1]['timestamp'] - self.ticks[0]['timestamp']) / 1000 if self.ticks else 0,
'periods': {}
}
# Calculate stats for each period
for period_name, cutoff_time in periods.items():
period_ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff_time]
if period_ticks:
prices = [tick['price'] for tick in period_ticks]
volumes = [tick.get('volume', 0) for tick in period_ticks]
period_stats = {
'tick_count': len(period_ticks),
'min_price': min(prices) if prices else None,
'max_price': max(prices) if prices else None,
'avg_price': sum(prices) / len(prices) if prices else None,
'last_price': period_ticks[-1]['price'] if period_ticks else None,
'total_volume': sum(volumes),
'ticks_per_second': len(period_ticks) / (int(period_name[:-3]) * 60) if period_ticks else 0
}
stats['periods'][period_name] = period_stats
logger.debug(f"Generated time-based stats: {len(stats['periods'])} periods")
return stats
class CandlestickData:
def __init__(self, max_length: int = 300):
self.timestamps = deque(maxlen=max_length)
self.opens = deque(maxlen=max_length)
self.highs = deque(maxlen=max_length)
self.lows = deque(maxlen=max_length)
self.closes = deque(maxlen=max_length)
self.volumes = deque(maxlen=max_length)
self.current_candle = {
'timestamp': None,
'open': None,
'high': None,
'low': None,
'close': None,
'volume': 0
}
self.candle_interval = 1 # 1 second by default
def update_from_trade(self, trade: Dict):
timestamp = trade['timestamp']
price = trade['price']
volume = trade.get('volume', 0)
# Round timestamp to nearest candle interval
candle_timestamp = int(timestamp / (self.candle_interval * 1000)) * (self.candle_interval * 1000)
if self.current_candle['timestamp'] != candle_timestamp:
# Save current candle if it exists
if self.current_candle['timestamp'] is not None:
self.timestamps.append(self.current_candle['timestamp'])
self.opens.append(self.current_candle['open'])
self.highs.append(self.current_candle['high'])
self.lows.append(self.current_candle['low'])
self.closes.append(self.current_candle['close'])
self.volumes.append(self.current_candle['volume'])
logger.debug(f"New candle saved: {self.current_candle}")
# Start new candle
self.current_candle = {
'timestamp': candle_timestamp,
'open': price,
'high': price,
'low': price,
'close': price,
'volume': volume
}
logger.debug(f"New candle started: {self.current_candle}")
else:
# Update current candle
if self.current_candle['high'] is None or price > self.current_candle['high']:
self.current_candle['high'] = price
if self.current_candle['low'] is None or price < self.current_candle['low']:
self.current_candle['low'] = price
self.current_candle['close'] = price
self.current_candle['volume'] += volume
logger.debug(f"Updated current candle: {self.current_candle}")
def get_dataframe(self) -> pd.DataFrame:
# Include current candle in the dataframe if it exists
timestamps = list(self.timestamps)
opens = list(self.opens)
highs = list(self.highs)
lows = list(self.lows)
closes = list(self.closes)
volumes = list(self.volumes)
if self.current_candle['timestamp'] is not None:
timestamps.append(self.current_candle['timestamp'])
opens.append(self.current_candle['open'])
highs.append(self.current_candle['high'])
lows.append(self.current_candle['low'])
closes.append(self.current_candle['close'])
volumes.append(self.current_candle['volume'])
df = pd.DataFrame({
'timestamp': timestamps,
'open': opens,
'high': highs,
'low': lows,
'close': closes,
'volume': volumes
})
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
return df
class MEXCWebSocket:
"""MEXC-specific WebSocket implementation"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').upper()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.ping_interval = 20
self.last_ping_time = 0
self.message_count = 0
# MEXC WebSocket configuration
self.ws_url = "wss://wbs-api.mexc.com/ws"
self.ws_sub_params = [
f"spot@public.kline.v3.api@{self.symbol}@Min1"
]
self.subscribe_msgs = [
{
"method": "SUBSCRIPTION",
"params": self.ws_sub_params
}
]
logger.info(f"Initialized MEXC WebSocket for symbol: {self.symbol}")
logger.debug(f"Subscribe messages: {json.dumps(self.subscribe_msgs)}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
# Subscribe to the streams
for msg in self.subscribe_msgs:
logger.info(f"Sending subscription message: {json.dumps(msg)}")
await self.ws.send(json.dumps(msg))
# Wait for subscription confirmation
response = await self.ws.recv()
logger.info(f"Subscription response: {response}")
if "Not Subscribed" in response:
logger.error(f"Subscription error: {response}")
await self.unsubscribe()
await self.close()
return False
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to MEXC WebSocket for {self.symbol}")
# Start ping task
asyncio.create_task(self.ping_loop())
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await self.unsubscribe()
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def ping_loop(self):
"""Send ping messages to keep connection alive"""
while self.running:
try:
current_time = time.time()
if current_time - self.last_ping_time >= self.ping_interval:
ping_msg = {"method": "PING"}
logger.debug("Sending ping")
await self.ws.send(json.dumps(ping_msg))
self.last_ping_time = current_time
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error in ping loop: {str(e)}")
break
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 10 == 0:
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
if isinstance(message, bytes):
return None
data = json.loads(message)
# Handle PONG response
if isinstance(data, dict) and data.get('msg') == 'PONG':
logger.debug("Received pong")
return None
# Handle kline data
if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list):
kline = data['data'][0]
if len(kline) >= 6:
kline_data = {
'timestamp': int(kline[0]), # Timestamp
'open': float(kline[1]), # Open
'high': float(kline[2]), # High
'low': float(kline[3]), # Low
'price': float(kline[4]), # Close
'volume': float(kline[5]), # Volume
'type': 'kline'
}
logger.info(f"Processed kline data: {kline_data}")
return kline_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def unsubscribe(self):
"""Unsubscribe from all channels"""
if self.ws:
for msg in self.subscribe_msgs:
unsub_msg = {
"method": "UNSUBSCRIPTION",
"params": msg["params"]
}
try:
await self.ws.send(json.dumps(unsub_msg))
except:
pass
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.unsubscribe()
await self.ws.close()
self.running = False
logger.info("WebSocket connection closed")
class BinanceWebSocket:
"""Binance WebSocket implementation for real-time tick data"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').lower()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.message_count = 0
# Binance WebSocket configuration
self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade"
logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}")
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 100 == 0: # Log every 100th message to avoid spam
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
data = json.loads(message)
# Process trade data
if 'e' in data and data['e'] == 'trade':
trade_data = {
'timestamp': data['T'], # Trade time
'price': float(data['p']), # Price
'volume': float(data['q']), # Quantity
'type': 'trade'
}
logger.debug(f"Processed trade data: {trade_data}")
return trade_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.ws.close()
self.running = False
logger.info("WebSocket connection closed")
class ExchangeWebSocket:
"""Generic WebSocket interface for cryptocurrency exchanges"""
def __init__(self, symbol: str, exchange: str = "binance"):
self.symbol = symbol
self.exchange = exchange.lower()
self.ws = None
# Initialize the appropriate WebSocket implementation
if self.exchange == "binance":
self.ws = BinanceWebSocket(symbol)
elif self.exchange == "mexc":
self.ws = MEXCWebSocket(symbol)
else:
raise ValueError(f"Unsupported exchange: {exchange}")
async def connect(self):
"""Connect to the exchange WebSocket"""
return await self.ws.connect()
async def receive(self) -> Optional[Dict]:
"""Receive data from the WebSocket"""
return await self.ws.receive()
async def close(self):
"""Close the WebSocket connection"""
await self.ws.close()
@property
def running(self):
"""Check if the WebSocket is running"""
return self.ws.running if self.ws else False
class CandleCache:
def __init__(self, max_candles: int = 2000):
self.candles = {
'1s': deque(maxlen=max_candles),
'1m': deque(maxlen=max_candles),
'1h': deque(maxlen=max_candles),
'1d': deque(maxlen=max_candles)
}
logger.info("Initialized CandleCache with max candles: {}".format(max_candles))
def add_candles(self, interval: str, new_candles: pd.DataFrame):
if interval in self.candles and not new_candles.empty:
for _, row in new_candles.iterrows():
self.candles[interval].append(row)
logger.debug(f"Added {len(new_candles)} candles to {interval} cache")
def get_recent_candles(self, interval: str, count: int = 500) -> pd.DataFrame:
if interval in self.candles and self.candles[interval]:
recent_candles = list(self.candles[interval])[-count:]
return pd.DataFrame(recent_candles)
return pd.DataFrame()
def update_cache(self, interval: str, new_candles: pd.DataFrame):
if interval not in self.candles:
logger.warning(f"Invalid interval {interval} for cache update")
return
if new_candles.empty:
logger.debug(f"No new candles to update {interval} cache")
return
# Check if timestamp column exists
if 'timestamp' not in new_candles.columns:
logger.warning(f"No timestamp column in new candles for {interval}")
return
last_cached_time = None
if self.candles[interval]:
try:
# Get the timestamp from the last cached candle
last_cached_candle = self.candles[interval][-1]
if isinstance(last_cached_candle, dict) and 'timestamp' in last_cached_candle:
last_cached_time = last_cached_candle['timestamp']
logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}")
except (IndexError, KeyError) as e:
logger.error(f"Error accessing timestamp from last cached candle: {e}")
try:
# Only filter if we have a valid last_cached_time
if last_cached_time is not None:
filtered_candles = new_candles[new_candles['timestamp'] > last_cached_time]
logger.debug(f"Filtered {len(filtered_candles)} new candles for {interval}")
self.add_candles(interval, filtered_candles)
else:
# If no previous candles, add all
logger.debug(f"No previous candles, adding all {len(new_candles)} candles to {interval} cache")
self.add_candles(interval, new_candles)
except Exception as e:
logger.error(f"Error updating cache for {interval}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
class RealTimeChart:
def __init__(self, symbol: str):
self.symbol = symbol
# Create a multi-page Dash app instead of a simple Dash app
self.app = dash.Dash(__name__,
suppress_callback_exceptions=True,
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}])
self.candlestick_data = CandlestickData()
self.tick_storage = TradeTickStorage(max_age_seconds=1800) # Store 30 minutes of ticks
self.ohlcv_cache = { # Cache for different intervals
'1s': None,
'1m': None,
'1h': None,
'1d': None
}
self.candle_cache = CandleCache() # Initialize local candle cache
self.historical_data = BinanceHistoricalData() # For fetching historical data
logger.info(f"Initializing RealTimeChart for {symbol}")
# Load historical data for longer timeframes at startup
self._load_historical_data()
# Setup the multi-page layout
self._setup_app_layout()
def _setup_app_layout(self):
# Button style
button_style = {
'background-color': '#4CAF50',
'color': 'white',
'padding': '10px 15px',
'margin': '5px',
'border': 'none',
'border-radius': '5px',
'font-size': '14px',
'cursor': 'pointer',
'transition': 'background-color 0.3s',
'font-weight': 'bold'
}
active_button_style = {
**button_style,
'background-color': '#2E7D32',
'box-shadow': '0 0 5px #2E7D32'
}
nav_button_style = {
'background-color': '#3f51b5',
'color': 'white',
'padding': '10px 15px',
'margin': '5px',
'border': 'none',
'border-radius': '5px',
'font-size': '14px',
'cursor': 'pointer',
'font-weight': 'bold'
}
# Content div for the current page
content_div = html.Div(id='page-content')
# Initialize the layout with navigation
self.app.layout = html.Div([
# Header with symbol and title
html.Div([
html.H1(f"{self.symbol} Real-Time Data", style={
'textAlign': 'center',
'color': '#FFFFFF',
'fontFamily': 'Arial, sans-serif',
'margin': '10px',
'textShadow': '2px 2px 4px #000000'
}),
# Navigation bar
html.Div([
dcc.Link(
html.Button('Price Chart', style=nav_button_style),
href=f'/{self.symbol.replace("/", "-")}/chart',
id='chart-link'
),
dcc.Link(
html.Button('Raw Ticks', style=nav_button_style),
href=f'/{self.symbol.replace("/", "-")}/ticks',
id='ticks-link'
),
], style={
'display': 'flex',
'justifyContent': 'center',
'margin': '10px'
}),
], style={
'backgroundColor': '#1E1E1E',
'padding': '10px',
'borderRadius': '5px',
'marginBottom': '10px',
'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'
}),
# URL Bar
dcc.Location(id='url', refresh=False),
# Content div will be populated based on the URL
content_div,
# Interval component for periodic updates
dcc.Interval(
id='interval-component',
interval=500, # Update every 500ms for smoother display
n_intervals=0
)
], style={
'backgroundColor': '#121212',
'padding': '20px',
'minHeight': '100vh',
'fontFamily': 'Arial, sans-serif'
})
# Register URL callback to update page content
@self.app.callback(
Output('page-content', 'children'),
[Input('url', 'pathname')]
)
def display_page(pathname):
if pathname is None:
pathname = f'/{self.symbol.replace("/", "-")}/chart'
symbol_path = self.symbol.replace("/", "-")
if pathname == f'/{symbol_path}/chart' or pathname == f'/' or pathname == f'/{symbol_path}':
return self._get_chart_layout(button_style, active_button_style)
elif pathname == f'/{symbol_path}/ticks':
return self._get_ticks_layout()
else:
return html.Div([
html.H1('404 - Page Not Found', style={'textAlign': 'center', 'color': 'white'})
])
# Register callback to update the interval selection from button clicks
self._setup_interval_callback(button_style, active_button_style)
# Register callback to update the chart data
self._setup_chart_callback()
# Register callback to update the ticks data
self._setup_ticks_callback()
def _get_chart_layout(self, button_style, active_button_style):
# Chart page layout
return html.Div([
# Interval selection buttons
html.Div([
html.Div("Candlestick Interval:", style={
'color': '#FFFFFF',
'marginRight': '10px',
'fontSize': '16px',
'fontWeight': 'bold'
}),
html.Button('1s', id='btn-1s', n_clicks=0, style=active_button_style),
html.Button('5s', id='btn-5s', n_clicks=0, style=button_style),
html.Button('15s', id='btn-15s', n_clicks=0, style=button_style),
html.Button('30s', id='btn-30s', n_clicks=0, style=button_style),
html.Button('1m', id='btn-1m', n_clicks=0, style=button_style),
], style={
'display': 'flex',
'alignItems': 'center',
'justifyContent': 'center',
'margin': '15px',
'backgroundColor': '#2C2C2C',
'padding': '10px',
'borderRadius': '5px'
}),
# Store for current interval
dcc.Store(id='interval-store', data={'interval': 1}),
# Main chart
dcc.Graph(
id='live-chart',
style={
'height': '180vh',
'border': '1px solid #444444',
'borderRadius': '5px',
'boxShadow': '0 4px 8px 0 rgba(0,0,0,0.2)'
}
),
])
def _get_ticks_layout(self):
# Ticks data page layout
return html.Div([
# Header and controls
html.Div([
html.H2(f"{self.symbol} Raw Tick Data (Last 5 Minutes)", style={
'textAlign': 'center',
'color': '#FFFFFF',
'margin': '10px 0'
}),
# Refresh button
html.Button('Refresh Data', id='refresh-ticks-btn', n_clicks=0, style={
'backgroundColor': '#4CAF50',
'color': 'white',
'padding': '10px 20px',
'margin': '10px auto',
'border': 'none',
'borderRadius': '5px',
'fontSize': '14px',
'cursor': 'pointer',
'display': 'block'
}),
# Time window selector
html.Div([
html.Label("Time Window:", style={'color': 'white', 'marginRight': '10px'}),
dcc.Dropdown(
id='time-window-dropdown',
options=[
{'label': 'Last 1 minute', 'value': 60},
{'label': 'Last 5 minutes', 'value': 300},
{'label': 'Last 15 minutes', 'value': 900},
{'label': 'Last 30 minutes', 'value': 1800},
],
value=300, # Default to 5 minutes
style={'width': '200px', 'backgroundColor': '#2C2C2C', 'color': 'black'}
)
], style={
'display': 'flex',
'alignItems': 'center',
'justifyContent': 'center',
'margin': '10px'
}),
], style={
'backgroundColor': '#2C2C2C',
'padding': '10px',
'borderRadius': '5px',
'marginBottom': '15px'
}),
# Stats cards
html.Div(id='tick-stats-cards', style={
'display': 'flex',
'flexWrap': 'wrap',
'justifyContent': 'space-around',
'marginBottom': '15px'
}),
# Ticks data table
html.Div(id='ticks-table-container', style={
'backgroundColor': '#232323',
'padding': '10px',
'borderRadius': '5px',
'overflowX': 'auto'
}),
# Price movement chart
html.Div([
html.H3("Price Movement", style={
'textAlign': 'center',
'color': '#FFFFFF',
'margin': '10px 0'
}),
dcc.Graph(id='tick-price-chart')
], style={
'backgroundColor': '#232323',
'padding': '10px',
'borderRadius': '5px',
'marginTop': '15px'
})
])
def _setup_interval_callback(self, button_style, active_button_style):
# Callback to update interval based on button clicks and update button styles
@self.app.callback(
[Output('interval-store', 'data'),
Output('btn-1s', 'style'),
Output('btn-5s', 'style'),
Output('btn-15s', 'style'),
Output('btn-30s', 'style'),
Output('btn-1m', 'style')],
[Input('btn-1s', 'n_clicks'),
Input('btn-5s', 'n_clicks'),
Input('btn-15s', 'n_clicks'),
Input('btn-30s', 'n_clicks'),
Input('btn-1m', 'n_clicks')],
[dash.dependencies.State('interval-store', 'data')]
)
def update_interval(n1, n5, n15, n30, n60, data):
ctx = dash.callback_context
if not ctx.triggered:
# Default state (1s selected)
return ({'interval': 1},
active_button_style, button_style, button_style, button_style, button_style)
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
if button_id == 'btn-1s':
return ({'interval': 1},
active_button_style, button_style, button_style, button_style, button_style)
elif button_id == 'btn-5s':
return ({'interval': 5},
button_style, active_button_style, button_style, button_style, button_style)
elif button_id == 'btn-15s':
return ({'interval': 15},
button_style, button_style, active_button_style, button_style, button_style)
elif button_id == 'btn-30s':
return ({'interval': 30},
button_style, button_style, button_style, active_button_style, button_style)
elif button_id == 'btn-1m':
return ({'interval': 60},
button_style, button_style, button_style, button_style, active_button_style)
# Default case - keep current interval and highlight appropriate button
current_interval = data.get('interval', 1)
styles = [button_style] * 5 # All inactive by default
# Set active style based on current interval
if current_interval == 1:
styles[0] = active_button_style
elif current_interval == 5:
styles[1] = active_button_style
elif current_interval == 15:
styles[2] = active_button_style
elif current_interval == 30:
styles[3] = active_button_style
elif current_interval == 60:
styles[4] = active_button_style
return (data, *styles)
def _setup_chart_callback(self):
# Callback to update the chart
@self.app.callback(
Output('live-chart', 'figure'),
[Input('interval-component', 'n_intervals'),
Input('interval-store', 'data')]
)
def update_chart(n, interval_data):
try:
interval = interval_data.get('interval', 1)
logger.debug(f"Updating chart for {self.symbol} with interval {interval}s")
# Get candlesticks from tick storage
df = self.tick_storage.get_candles(interval_seconds=interval)
# Get current price and stats using our enhanced methods
current_price = self.tick_storage.get_latest_price()
price_stats = self.tick_storage.get_price_stats()
time_stats = self.tick_storage.get_time_based_stats()
logger.debug(f"Current price: {current_price}, Stats: {price_stats}")
fig = make_subplots(
rows=6, cols=1, # Adjusted to accommodate new subcharts
vertical_spacing=0.02, # Reduced for better use of vertical space
subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1s OHLCV', '1m OHLCV', '1h OHLCV', '1d OHLCV'),
row_heights=[0.5, 0.1, 0.1, 0.1, 0.1, 0.1] # Give more space to main chart
)
if not df.empty and len(df) > 0:
logger.debug(f"Candles dataframe shape: {df.shape}, columns: {df.columns.tolist()}")
# Add candlestick chart
fig.add_trace(
go.Candlestick(
x=df['timestamp'],
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
name='Price',
increasing_line_color='#33CC33', # Green
decreasing_line_color='#FF4136' # Red
),
row=1, col=1
)
# Add volume bars
colors = ['#33CC33' if close >= open else '#FF4136'
for close, open in zip(df['close'], df['open'])]
fig.add_trace(
go.Bar(
x=df['timestamp'],
y=df['volume'],
name='Volume',
marker_color=colors
),
row=2, col=1
)
# Add latest price line from the candlestick data
latest_price = df['close'].iloc[-1]
fig.add_shape(
type="line",
x0=df['timestamp'].min(),
y0=latest_price,
x1=df['timestamp'].max(),
y1=latest_price,
line=dict(color="yellow", width=1, dash="dash"),
row=1, col=1
)
# Annotation for last candle close price
fig.add_annotation(
x=df['timestamp'].max(),
y=latest_price,
text=f"{latest_price:.2f}",
showarrow=False,
font=dict(size=14, color="yellow"),
xshift=50,
row=1, col=1
)
# If we have a more recent price from ticks, add that too
if current_price and abs(current_price - latest_price) > 0.01:
# Add current price line
fig.add_shape(
type="line",
x0=df['timestamp'].min(),
y0=current_price,
x1=df['timestamp'].max(),
y1=current_price,
line=dict(color="cyan", width=1, dash="dot"),
row=1, col=1
)
# Add current price annotation
fig.add_annotation(
x=df['timestamp'].max(),
y=current_price,
text=f"Current: {current_price:.2f}",
showarrow=False,
font=dict(size=14, color="cyan"),
xshift=50,
yshift=20,
row=1, col=1
)
# Fetch and cache OHLCV data for different intervals
for interval_key in self.ohlcv_cache.keys():
try:
new_candles = self.tick_storage.get_candles(interval_seconds=self._interval_to_seconds(interval_key))
if not new_candles.empty:
# Update the cache with new candles
self.candle_cache.update_cache(interval_key, new_candles)
# Get the updated candles from cache for display
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key)
logger.debug(f"Updated cache for {interval_key}, now has {len(self.ohlcv_cache[interval_key])} candles")
except Exception as e:
logger.error(f"Error updating cache for {interval_key}: {str(e)}")
# Add OHLCV subcharts
for i, (interval_key, ohlcv_df) in enumerate(self.ohlcv_cache.items(), start=3):
if ohlcv_df is not None and not ohlcv_df.empty:
fig.add_trace(
go.Candlestick(
x=ohlcv_df['timestamp'],
open=ohlcv_df['open'],
high=ohlcv_df['high'],
low=ohlcv_df['low'],
close=ohlcv_df['close'],
name=f'{interval_key} OHLCV',
increasing_line_color='#33CC33',
decreasing_line_color='#FF4136'
),
row=i, col=1
)
else:
# If no data, add a text annotation to the chart
logger.warning(f"No data to display for {self.symbol} - tick count: {len(self.tick_storage.ticks)}")
# Add a message to the empty chart
fig.add_annotation(
x=0.5, y=0.5,
text=f"Waiting for {self.symbol} data...",
showarrow=False,
font=dict(size=20, color="white"),
xref="paper", yref="paper"
)
# Build info box text with all the statistics
info_lines = [f"<b>{self.symbol}</b>"]
# Add current price if available
if current_price:
info_lines.append(f"Current: <b>{current_price:.2f}</b> USDT")
# Add price statistics if available
if price_stats['count'] > 0:
# Format time range
age_text = f"{price_stats['age_seconds']:.1f}s"
if price_stats['age_seconds'] > 60:
minutes = int(price_stats['age_seconds'] / 60)
seconds = int(price_stats['age_seconds'] % 60)
age_text = f"{minutes}m {seconds}s"
# Add price range and change
if price_stats['min'] is not None and price_stats['max'] is not None:
price_range = f"Range: {price_stats['min']:.2f} - {price_stats['max']:.2f}"
info_lines.append(price_range)
# Add tick count and time range
info_lines.append(f"Ticks: {price_stats['count']} in {age_text}")
# Add candle count
candle_count = len(df) if not df.empty else 0
info_lines.append(f"Candles: {candle_count} ({interval}s)")
# Add time-based statistics
if time_stats and time_stats['periods']:
info_lines.append("<b>Time-Based Stats:</b>")
for period, stats in time_stats['periods'].items():
if stats['tick_count'] > 0:
info_lines.append(f"{period}: {stats['tick_count']} ticks, {stats['ticks_per_second']:.2f}/s")
if stats['min_price'] is not None and stats['max_price'] is not None:
price_change = stats['last_price'] - stats['min_price']
change_pct = (price_change / stats['min_price']) * 100 if stats['min_price'] > 0 else 0
info_lines.append(f" Range: {stats['min_price']:.2f}-{stats['max_price']:.2f} ({change_pct:.2f}%)")
# Add info box to the chart
fig.add_annotation(
x=0.01,
y=0.99,
xref="paper",
yref="paper",
text="<br>".join(info_lines),
showarrow=False,
font=dict(size=12, color="white"),
align="left",
bgcolor="rgba(0,0,50,0.7)",
bordercolor="#3366CC",
borderwidth=2,
borderpad=5,
xanchor="left",
yanchor="top"
)
# Update layout with improved styling
interval_text = {
1: "1 second",
5: "5 seconds",
15: "15 seconds",
30: "30 seconds",
60: "1 minute"
}.get(interval, f"{interval}s")
fig.update_layout(
title_text=f"{self.symbol} Real-Time Data ({interval_text})",
title_x=0.5, # Center the title
xaxis_rangeslider_visible=False,
height=1800, # Increased height for taller display
template='plotly_dark',
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(25,25,50,1)',
font=dict(family="Arial, sans-serif", size=12, color="white"),
showlegend=True,
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
)
)
# Update axes styling
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
return fig
except Exception as e:
logger.error(f"Error updating chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Create a minimal figure with error message
fig = go.Figure()
fig.add_annotation(
x=0.5, y=0.5,
text=f"Error updating chart: {str(e)}",
showarrow=False,
font=dict(size=14, color="red"),
xref="paper", yref="paper"
)
fig.update_layout(
height=800,
template='plotly_dark',
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(25,25,50,1)'
)
return fig
def _setup_ticks_callback(self):
# Callbacks to update the ticks data display
# Callback to update stats cards
@self.app.callback(
Output('tick-stats-cards', 'children'),
[Input('interval-component', 'n_intervals'),
Input('refresh-ticks-btn', 'n_clicks'),
Input('time-window-dropdown', 'value')]
)
def update_tick_stats(n_intervals, n_clicks, time_window):
# Get time window in seconds
window_seconds = time_window if time_window else 300
# Calculate time range for filtering
now = int(time.time() * 1000)
start_time = now - (window_seconds * 1000)
# Get filtered ticks
filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time)
if not filtered_ticks:
return [html.Div("No tick data available in the selected time window.",
style={'color': 'white', 'textAlign': 'center', 'margin': '20px'})]
# Calculate stats
tick_count = len(filtered_ticks)
prices = [tick['price'] for tick in filtered_ticks]
min_price = min(prices) if prices else 0
max_price = max(prices) if prices else 0
avg_price = sum(prices) / len(prices) if prices else 0
price_range = max_price - min_price
volumes = [tick.get('volume', 0) for tick in filtered_ticks]
total_volume = sum(volumes)
avg_volume = total_volume / len(volumes) if volumes else 0
# Calculate additional volume stats
max_volume = max(volumes) if volumes else 0
min_volume = min(volumes) if volumes else 0
median_volume = sorted(volumes)[len(volumes)//2] if volumes else 0
# Calculate trade value in USDT
trade_values = [p * v for p, v in zip(prices, volumes)]
total_value = sum(trade_values)
avg_trade_value = total_value / len(trade_values) if trade_values else 0
first_timestamp = filtered_ticks[0]['timestamp']
last_timestamp = filtered_ticks[-1]['timestamp']
time_span_ms = last_timestamp - first_timestamp
time_span_seconds = time_span_ms / 1000
ticks_per_second = tick_count / time_span_seconds if time_span_seconds > 0 else 0
# Create stat cards
card_style = {
'backgroundColor': '#1E2130',
'borderRadius': '5px',
'padding': '15px',
'marginBottom': '10px',
'width': '23%',
'boxShadow': '0 2px 4px rgba(0,0,0,0.2)',
'color': 'white',
'display': 'flex',
'flexDirection': 'column',
'alignItems': 'center',
'justifyContent': 'center'
}
mobile_card_style = {
**card_style,
'width': '100%',
'marginBottom': '10px'
}
value_style = {
'fontSize': '24px',
'fontWeight': 'bold',
'color': '#4CAF50',
'marginTop': '5px'
}
label_style = {
'fontSize': '14px',
'color': '#AAAAAA'
}
return [
html.Div([
html.Div("Tick Count", style=label_style),
html.Div(f"{tick_count:,}", style=value_style),
html.Div(f"{ticks_per_second:.2f} ticks/sec", style=label_style)
], style=card_style),
html.Div([
html.Div("Price Range", style=label_style),
html.Div(f"{min_price:.2f} - {max_price:.2f}", style=value_style),
html.Div(f"Range: {price_range:.2f}", style=label_style)
], style=card_style),
html.Div([
html.Div("Average Price", style=label_style),
html.Div(f"{avg_price:.2f}", style=value_style),
html.Div("USDT", style=label_style)
], style=card_style),
html.Div([
html.Div("Total Volume", style=label_style),
html.Div(f"{total_volume:.8f}", style=value_style),
html.Div(f"Avg: {avg_volume:.8f}", style=label_style)
], style=card_style),
html.Div([
html.Div("Max Volume", style=label_style),
html.Div(f"{max_volume:.8f}", style=value_style),
html.Div("Median: {:.8f}".format(median_volume), style=label_style)
], style=card_style),
html.Div([
html.Div("Total Value", style=label_style),
html.Div(f"{total_value:.2f}", style=value_style),
html.Div(f"Avg: {avg_trade_value:.2f}", style=label_style)
], style=card_style)
]
# Callback to update ticks table
@self.app.callback(
Output('ticks-table-container', 'children'),
[Input('interval-component', 'n_intervals'),
Input('refresh-ticks-btn', 'n_clicks'),
Input('time-window-dropdown', 'value')]
)
def update_ticks_table(n_intervals, n_clicks, time_window):
# Get time window in seconds
window_seconds = time_window if time_window else 300
# Calculate time range for filtering
now = int(time.time() * 1000)
start_time = now - (window_seconds * 1000)
# Get filtered ticks
filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time)
if not filtered_ticks:
return html.Div("No tick data available in the selected time window.",
style={'color': 'white', 'textAlign': 'center', 'margin': '20px'})
# Convert to readable format
formatted_ticks = []
for tick in filtered_ticks:
timestamp_dt = datetime.fromtimestamp(tick['timestamp'] / 1000)
formatted_time = timestamp_dt.strftime('%H:%M:%S.%f')[:-3] # Format as HH:MM:SS.mmm
formatted_ticks.append({
'time': formatted_time,
'price': f"{tick['price']:.2f}",
'volume': f"{tick['volume']:.8f}",
'value': f"{tick['price'] * tick['volume']:.2f}"
})
# Display only the most recent ticks
display_limit = 100
limited_ticks = formatted_ticks[-display_limit:] if len(formatted_ticks) > display_limit else formatted_ticks
limited_ticks.reverse() # Show most recent at the top
# Create table
table_style = {
'width': '100%',
'borderCollapse': 'collapse',
'color': 'white',
'fontFamily': 'monospace'
}
header_style = {
'backgroundColor': '#1A1A1A',
'fontWeight': 'bold',
'padding': '8px',
'textAlign': 'left',
'borderBottom': '2px solid #444'
}
cell_style = {
'padding': '6px',
'borderBottom': '1px solid #333',
'textAlign': 'right'
}
time_cell_style = {
**cell_style,
'textAlign': 'left'
}
# Create table header
header = html.Tr([
html.Th("Time", style=header_style),
html.Th("Price", style=header_style),
html.Th("Volume", style=header_style),
html.Th("Value (USDT)", style=header_style)
])
# Create table rows
rows = []
for i, tick in enumerate(limited_ticks):
row_style = {'backgroundColor': '#1E1E1E'} if i % 2 == 0 else {'backgroundColor': '#252525'}
rows.append(html.Tr([
html.Td(tick['time'], style={**time_cell_style, **row_style}),
html.Td(tick['price'], style={**cell_style, **row_style}),
html.Td(tick['volume'], style={**cell_style, **row_style}),
html.Td(tick['value'], style={**cell_style, **row_style})
]))
return [
html.Div([
html.H3(f"Latest {len(limited_ticks)} Ticks (from {len(filtered_ticks)} total)",
style={'color': 'white', 'marginBottom': '10px', 'textAlign': 'center'}),
html.Table([html.Thead(header), html.Tbody(rows)], style=table_style)
])
]
# Callback to update price chart
@self.app.callback(
Output('tick-price-chart', 'figure'),
[Input('interval-component', 'n_intervals'),
Input('refresh-ticks-btn', 'n_clicks'),
Input('time-window-dropdown', 'value')]
)
def update_price_chart(n_intervals, n_clicks, time_window):
# Get time window in seconds
window_seconds = time_window if time_window else 300
# Calculate time range for filtering
now = int(time.time() * 1000)
start_time = now - (window_seconds * 1000)
# Get filtered ticks
filtered_ticks = self.tick_storage.get_ticks_from_time(start_time_ms=start_time)
# Create figure with 2 subplots - price and volume
fig = make_subplots(
rows=2, cols=1,
shared_xaxes=True,
vertical_spacing=0.03,
row_heights=[0.7, 0.3],
subplot_titles=(f"Price Movement (Last {window_seconds // 60} minutes)", "Volume")
)
if not filtered_ticks:
fig.add_annotation(
x=0.5, y=0.5,
text="No tick data available in the selected time window.",
showarrow=False,
font=dict(size=14, color="white"),
xref="paper", yref="paper"
)
else:
# Convert timestamps to datetime for better x-axis display
timestamps = [datetime.fromtimestamp(tick['timestamp'] / 1000) for tick in filtered_ticks]
prices = [tick['price'] for tick in filtered_ticks]
volumes = [tick.get('volume', 0) for tick in filtered_ticks]
# Add price scatter plot
fig.add_trace(
go.Scatter(
x=timestamps,
y=prices,
mode='lines',
name='Price',
line=dict(color='#4CAF50', width=1.5)
),
row=1, col=1
)
# Create a volume profile on the right side of the price chart
if len(prices) > 5: # Only create profile if we have enough data
# Group prices into bins
price_min = min(prices)
price_max = max(prices)
# Create approximately 20 bins based on price range
bin_size = max(0.01, (price_max - price_min) / 20)
# Create a dictionary to hold volume by price level
volume_by_price = {}
# Group volumes by price bins
for p, v in zip(prices, volumes):
bin_key = round(p / bin_size) * bin_size
if bin_key in volume_by_price:
volume_by_price[bin_key] += v
else:
volume_by_price[bin_key] = v
# Sort by price level
sorted_bins = sorted(volume_by_price.items())
profile_prices = [p for p, _ in sorted_bins]
profile_volumes = [v for _, v in sorted_bins]
# Add separate volume profile trace
fig.add_trace(
go.Bar(
y=profile_prices,
x=profile_volumes,
orientation='h',
name='Volume Profile',
marker=dict(
color=['#33CC33' if p <= latest_price else '#FF4136' for p in profile_prices],
opacity=0.5
),
showlegend=True,
hovertemplate='Price: %{y:.2f}<br>Volume: %{x:.8f}<extra></extra>'
),
row=1, col=1
)
# Add a line marking the latest price
fig.add_shape(
type="line",
y0=latest_price, y1=latest_price,
x0=0, x1=max(profile_volumes) * 1.1,
line=dict(color="yellow", width=1, dash="dash"),
row=1, col=1
)
# Add volume bars in separate subplot
# Color volume bars green for price increases, red for decreases
if len(timestamps) > 1:
# Compare each price with the previous to determine color
colors = []
for i in range(len(prices)):
if i == 0:
colors.append('#33CC33') # Default to green for first tick
else:
if prices[i] >= prices[i-1]:
colors.append('#33CC33') # Green for price increase/same
else:
colors.append('#FF4136') # Red for price decrease
else:
colors = ['#33CC33'] # Default green if only one tick
fig.add_trace(
go.Bar(
x=timestamps,
y=volumes,
name='Volume',
marker=dict(color=colors)
),
row=2, col=1
)
# Compute stats for annotations
latest_price = prices[-1] if prices else 0
total_volume = sum(volumes)
max_volume = max(volumes) if volumes else 0
avg_volume = total_volume / len(volumes) if volumes else 0
# Add annotations for latest price
fig.add_annotation(
x=timestamps[-1] if timestamps else 0,
y=latest_price,
text=f"{latest_price:.2f}",
showarrow=True,
arrowhead=1,
arrowsize=1,
arrowwidth=2,
arrowcolor="#4CAF50",
font=dict(size=12, color="#4CAF50"),
xshift=50,
row=1, col=1
)
# Add annotations for volume stats
fig.add_annotation(
x=timestamps[-1] if timestamps else 0,
y=max_volume,
text=f"Max: {max_volume:.8f}",
showarrow=False,
font=dict(size=10, color="rgba(128, 128, 255, 1)"),
xshift=50,
row=2, col=1
)
# Update layout
fig.update_layout(
title_text=f"{self.symbol} Tick Data",
title_x=0.5,
template='plotly_dark',
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(25,25,50,1)',
height=600, # Increased height for better visualization
margin=dict(l=40, r=40, t=80, b=40),
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
hovermode="x unified" # Show all data points at the same x-coordinate
)
# Update x-axis to be shared
fig.update_xaxes(
showgrid=True,
gridwidth=1,
gridcolor='rgba(128,128,128,0.2)',
rangeslider_visible=False
)
# Update y-axes
fig.update_yaxes(
title_text="Price (USDT)",
showgrid=True,
gridwidth=1,
gridcolor='rgba(128,128,128,0.2)',
row=1, col=1
)
fig.update_yaxes(
title_text="Volume",
showgrid=True,
gridwidth=1,
gridcolor='rgba(128,128,128,0.2)',
row=2, col=1
)
return fig
def _interval_to_seconds(self, interval_key: str) -> int:
"""Convert interval key to seconds"""
mapping = {
'1s': 1,
'1m': 60,
'1h': 3600,
'1d': 86400
}
return mapping.get(interval_key, 1)
async def start_websocket(self):
ws = ExchangeWebSocket(self.symbol)
connection_attempts = 0
max_attempts = 10 # Maximum connection attempts before longer waiting period
while True: # Keep trying to maintain connection
connection_attempts += 1
if not await ws.connect():
logger.error(f"Failed to connect to exchange for {self.symbol}")
# Gradually increase wait time based on number of connection failures
wait_time = min(5 * connection_attempts, 60) # Cap at 60 seconds
logger.warning(f"Waiting {wait_time} seconds before retry (attempt {connection_attempts})")
if connection_attempts >= max_attempts:
logger.warning(f"Reached {max_attempts} connection attempts, taking a longer break")
await asyncio.sleep(120) # 2 minutes wait after max attempts
connection_attempts = 0 # Reset counter
else:
await asyncio.sleep(wait_time)
continue
# Successfully connected
connection_attempts = 0
try:
logger.info(f"WebSocket connected for {self.symbol}, beginning data collection")
tick_count = 0
last_tick_count_log = time.time()
last_status_report = time.time()
# Track stats for reporting
price_min = float('inf')
price_max = float('-inf')
price_last = None
volume_total = 0
start_collection_time = time.time()
while True:
if not ws.running:
logger.warning(f"WebSocket connection lost for {self.symbol}, breaking loop")
break
data = await ws.receive()
if data:
if data.get('type') == 'kline':
# Use kline data directly for candlestick
trade_data = {
'timestamp': data['timestamp'],
'price': data['price'],
'volume': data['volume'],
'open': data['open'],
'high': data['high'],
'low': data['low']
}
logger.debug(f"Received kline data: {data}")
else:
# Use trade data
trade_data = {
'timestamp': data['timestamp'],
'price': data['price'],
'volume': data['volume']
}
# Update stats
price = trade_data['price']
volume = trade_data['volume']
price_min = min(price_min, price)
price_max = max(price_max, price)
price_last = price
volume_total += volume
# Store raw tick in the tick storage
self.tick_storage.add_tick(trade_data)
tick_count += 1
# Also update the old candlestick data for backward compatibility
self.candlestick_data.update_from_trade(trade_data)
# Log tick counts periodically
current_time = time.time()
if current_time - last_tick_count_log >= 10: # Log every 10 seconds
elapsed = current_time - last_tick_count_log
tps = tick_count / elapsed if elapsed > 0 else 0
logger.info(f"{self.symbol}: Collected {tick_count} ticks in last {elapsed:.1f}s ({tps:.2f} ticks/sec), total: {len(self.tick_storage.ticks)}")
last_tick_count_log = current_time
tick_count = 0
# Check if ticks are being converted to candles
if len(self.tick_storage.ticks) > 0:
sample_df = self.tick_storage.get_candles(interval_seconds=1)
logger.info(f"{self.symbol}: Sample candle count: {len(sample_df)}")
# Periodic status report (every 60 seconds)
if current_time - last_status_report >= 60:
elapsed_total = current_time - start_collection_time
logger.info(f"{self.symbol} Status Report:")
logger.info(f" Collection time: {elapsed_total:.1f} seconds")
logger.info(f" Price range: {price_min:.2f} - {price_max:.2f} (last: {price_last:.2f})")
logger.info(f" Total volume: {volume_total:.8f}")
logger.info(f" Active ticks in storage: {len(self.tick_storage.ticks)}")
# Reset stats for next period
last_status_report = current_time
price_min = float('inf') if price_last is None else price_last
price_max = float('-inf') if price_last is None else price_last
volume_total = 0
await asyncio.sleep(0.01)
except websockets.exceptions.ConnectionClosed as e:
logger.error(f"WebSocket connection closed for {self.symbol}: {str(e)}")
except Exception as e:
logger.error(f"Error in WebSocket loop for {self.symbol}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
finally:
logger.info(f"Closing WebSocket connection for {self.symbol}")
await ws.close()
logger.info(f"Waiting 5 seconds before reconnecting {self.symbol} WebSocket...")
await asyncio.sleep(5)
def run(self, host='localhost', port=8050):
try:
logger.info(f"Starting Dash server for {self.symbol} on {host}:{port}")
self.app.run(debug=False, host=host, port=port)
except Exception as e:
logger.error(f"Error running Dash server on port {port}: {str(e)}")
# Try an alternative port if the primary one is in use
if "Address already in use" in str(e):
alt_port = port + 100
logger.warning(f"Port {port} is busy, trying alternative port {alt_port}")
try:
self.app.run(debug=False, host=host, port=alt_port)
except Exception as alt_e:
logger.error(f"Error running Dash server on alternative port {alt_port}: {str(alt_e)}")
else:
# Re-raise other exceptions
raise
def _load_historical_data(self):
"""Load historical data for 1m, 1h, and 1d timeframes from Binance API"""
try:
logger.info(f"Loading historical data for {self.symbol}...")
# Define intervals to fetch
intervals = {
'1m': 60,
'1h': 3600,
'1d': 86400
}
for interval_key, interval_seconds in intervals.items():
# Fetch historical data
historical_df = self.historical_data.get_historical_candles(
symbol=self.symbol,
interval_seconds=interval_seconds,
limit=500 # Get 500 candles
)
if not historical_df.empty:
logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key}")
# Add to cache
for _, row in historical_df.iterrows():
# Convert to dict for storage
candle_dict = row.to_dict()
self.candle_cache.candles[interval_key].append(candle_dict)
# Update ohlcv_cache
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key)
logger.info(f"Added {len(historical_df)} candles to {interval_key} cache")
else:
logger.warning(f"No historical data available for {self.symbol} {interval_key}")
except Exception as e:
logger.error(f"Error loading historical data: {str(e)}")
import traceback
logger.error(traceback.format_exc())
class BinanceHistoricalData:
"""Fetch historical candle data from Binance"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3/klines"
# Create a cache directory if it doesn't exist
self.cache_dir = os.path.join(os.getcwd(), "cache")
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}")
def _get_interval_string(self, interval_seconds: int) -> str:
"""Convert interval seconds to Binance interval string"""
if interval_seconds == 60: # 1m
return "1m"
elif interval_seconds == 3600: # 1h
return "1h"
elif interval_seconds == 86400: # 1d
return "1d"
else:
# Default to 1m if not recognized
logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m")
return "1m"
def _get_cache_filename(self, symbol: str, interval: str) -> str:
"""Generate cache filename for the symbol and interval"""
# Replace any slashes in symbol with underscore
safe_symbol = symbol.replace("/", "_")
return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]:
"""Load candle data from cache if available and not expired"""
filename = self._get_cache_filename(symbol, interval)
if not os.path.exists(filename):
logger.debug(f"No cache file found for {symbol} {interval}")
return None
# Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d)
file_age = time.time() - os.path.getmtime(filename)
max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others
if file_age > max_age:
logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)")
return None
try:
df = pd.read_csv(filename)
# Convert timestamp string back to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error loading from cache: {str(e)}")
return None
def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool:
"""Save candle data to cache"""
if df.empty:
logger.warning(f"No data to cache for {symbol} {interval}")
return False
filename = self._get_cache_filename(symbol, interval)
try:
df.to_csv(filename, index=False)
logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame:
"""Get historical candle data for the specified symbol and interval"""
# Convert to Binance format
clean_symbol = symbol.replace("/", "")
interval = self._get_interval_string(interval_seconds)
# Try to load from cache first
cached_data = self._load_from_cache(symbol, interval)
if cached_data is not None and len(cached_data) >= limit:
return cached_data.tail(limit)
# Fetch from API if not cached or insufficient
try:
logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API")
params = {
"symbol": clean_symbol,
"interval": interval,
"limit": limit
}
response = requests.get(self.base_url, params=params)
response.raise_for_status() # Raise exception for HTTP errors
# Process the data
candles = response.json()
if not candles:
logger.warning(f"No candles returned from Binance for {symbol} {interval}")
return pd.DataFrame()
# Convert to DataFrame - Binance returns data in this format:
# [
# [
# 1499040000000, // Open time
# "0.01634790", // Open
# "0.80000000", // High
# "0.01575800", // Low
# "0.01577100", // Close
# "148976.11427815", // Volume
# ... // Ignore the rest
# ],
# ...
# ]
df = pd.DataFrame(candles, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert types
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Keep only needed columns
df = df[["timestamp", "open", "high", "low", "close", "volume"]]
# Cache the results
self._save_to_cache(df, symbol, interval)
logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
async def main():
symbols = ["ETH/USDT", "BTC/USDT"]
logger.info(f"Starting application for symbols: {symbols}")
charts = []
websocket_tasks = []
# Create a chart and websocket task for each symbol
for symbol in symbols:
chart = RealTimeChart(symbol)
charts.append(chart)
websocket_tasks.append(asyncio.create_task(chart.start_websocket()))
# Run Dash in a separate thread to not block the event loop
server_threads = []
for i, chart in enumerate(charts):
port = 8050 + i # Use different ports for each chart
logger.info(f"Starting chart for {chart.symbol} on port {port}")
thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed
thread.daemon = True
thread.start()
server_threads.append(thread)
logger.info(f"Thread started for {chart.symbol} on port {port}")
try:
# Keep the main task running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
finally:
for task in websocket_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application terminated by user")