gogo2/realtime.py
2025-03-18 23:03:30 +02:00

823 lines
33 KiB
Python

import asyncio
import json
import logging
import datetime
from typing import Dict, List, Optional
import websockets
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import dash
from dash import html, dcc
from dash.dependencies import Input, Output
import pandas as pd
import numpy as np
from collections import deque
import time
from threading import Thread
# Configure logging with more detailed format
logging.basicConfig(
level=logging.DEBUG, # Changed to DEBUG for more detailed logs
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('realtime_chart.log')
]
)
logger = logging.getLogger(__name__)
class TradeTickStorage:
"""Store and manage raw trade ticks for display and candle formation"""
def __init__(self, max_age_seconds: int = 300): # 5 minutes by default
self.ticks = []
self.max_age_seconds = max_age_seconds
logger.info(f"Initialized TradeTickStorage with max age: {max_age_seconds} seconds")
def add_tick(self, tick: Dict):
"""Add a new trade tick to storage"""
self.ticks.append(tick)
logger.debug(f"Added tick: {tick}, total ticks: {len(self.ticks)}")
# Clean up old ticks
self._cleanup()
def _cleanup(self):
"""Remove ticks older than max_age_seconds"""
now = int(time.time() * 1000) # Current time in ms
cutoff = now - (self.max_age_seconds * 1000)
old_count = len(self.ticks)
self.ticks = [tick for tick in self.ticks if tick['timestamp'] >= cutoff]
if old_count > len(self.ticks):
logger.debug(f"Cleaned up {old_count - len(self.ticks)} old ticks")
def get_ticks_as_df(self) -> pd.DataFrame:
"""Return ticks as a DataFrame"""
if not self.ticks:
logger.warning("No ticks available for DataFrame conversion")
return pd.DataFrame()
df = pd.DataFrame(self.ticks)
if not df.empty:
logger.debug(f"Converting timestamps for {len(df)} ticks")
# Ensure timestamp column exists
if 'timestamp' not in df.columns:
logger.error("Tick data missing timestamp column")
return pd.DataFrame()
# Check timestamp datatype before conversion
sample_ts = df['timestamp'].iloc[0] if len(df) > 0 else None
logger.debug(f"Sample timestamp before conversion: {sample_ts}, type: {type(sample_ts)}")
# Convert timestamps to datetime
try:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
logger.debug(f"Timestamps converted to datetime successfully")
if len(df) > 0:
logger.debug(f"Sample converted timestamp: {df['timestamp'].iloc[0]}")
except Exception as e:
logger.error(f"Error converting timestamps: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
return df
def get_candles(self, interval_seconds: int = 1) -> pd.DataFrame:
"""Convert ticks to OHLCV candles at specified interval"""
if not self.ticks:
logger.warning("No ticks available for candle formation")
return pd.DataFrame()
# Ensure ticks are up to date
self._cleanup()
# Convert to DataFrame
df = self.get_ticks_as_df()
if df.empty:
logger.warning("Tick DataFrame is empty after conversion")
return pd.DataFrame()
logger.info(f"Preparing to create candles from {len(df)} ticks")
try:
# Use timestamp column for resampling
df = df.set_index('timestamp')
# Create interval string for resampling - use 's' instead of deprecated 'S'
interval_str = f'{interval_seconds}s'
# Resample to create OHLCV candles
logger.debug(f"Resampling with interval: {interval_str}")
candles = df.resample(interval_str).agg({
'price': ['first', 'max', 'min', 'last'],
'volume': 'sum'
})
# Check if resampling produced any data
if candles.empty:
logger.warning("Resampling produced empty dataframe - check timestamp distribution")
# Show timestamp ranges to diagnose potential resampling issues
if not df.empty:
min_time = df.index.min()
max_time = df.index.max()
logger.info(f"Tick timestamp range: {min_time} to {max_time}")
return pd.DataFrame()
# Flatten MultiIndex columns
candles.columns = ['open', 'high', 'low', 'close', 'volume']
# Reset index to get timestamp as column
candles = candles.reset_index()
# Ensure no NaN values
candles = candles.dropna()
logger.debug(f"Generated {len(candles)} candles from {len(self.ticks)} ticks")
return candles
except Exception as e:
logger.error(f"Error in candle formation: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
class CandlestickData:
def __init__(self, max_length: int = 300):
self.timestamps = deque(maxlen=max_length)
self.opens = deque(maxlen=max_length)
self.highs = deque(maxlen=max_length)
self.lows = deque(maxlen=max_length)
self.closes = deque(maxlen=max_length)
self.volumes = deque(maxlen=max_length)
self.current_candle = {
'timestamp': None,
'open': None,
'high': None,
'low': None,
'close': None,
'volume': 0
}
self.candle_interval = 1 # 1 second by default
def update_from_trade(self, trade: Dict):
timestamp = trade['timestamp']
price = trade['price']
volume = trade.get('volume', 0)
# Round timestamp to nearest candle interval
candle_timestamp = int(timestamp / (self.candle_interval * 1000)) * (self.candle_interval * 1000)
if self.current_candle['timestamp'] != candle_timestamp:
# Save current candle if it exists
if self.current_candle['timestamp'] is not None:
self.timestamps.append(self.current_candle['timestamp'])
self.opens.append(self.current_candle['open'])
self.highs.append(self.current_candle['high'])
self.lows.append(self.current_candle['low'])
self.closes.append(self.current_candle['close'])
self.volumes.append(self.current_candle['volume'])
logger.debug(f"New candle saved: {self.current_candle}")
# Start new candle
self.current_candle = {
'timestamp': candle_timestamp,
'open': price,
'high': price,
'low': price,
'close': price,
'volume': volume
}
logger.debug(f"New candle started: {self.current_candle}")
else:
# Update current candle
if self.current_candle['high'] is None or price > self.current_candle['high']:
self.current_candle['high'] = price
if self.current_candle['low'] is None or price < self.current_candle['low']:
self.current_candle['low'] = price
self.current_candle['close'] = price
self.current_candle['volume'] += volume
logger.debug(f"Updated current candle: {self.current_candle}")
def get_dataframe(self) -> pd.DataFrame:
# Include current candle in the dataframe if it exists
timestamps = list(self.timestamps)
opens = list(self.opens)
highs = list(self.highs)
lows = list(self.lows)
closes = list(self.closes)
volumes = list(self.volumes)
if self.current_candle['timestamp'] is not None:
timestamps.append(self.current_candle['timestamp'])
opens.append(self.current_candle['open'])
highs.append(self.current_candle['high'])
lows.append(self.current_candle['low'])
closes.append(self.current_candle['close'])
volumes.append(self.current_candle['volume'])
df = pd.DataFrame({
'timestamp': timestamps,
'open': opens,
'high': highs,
'low': lows,
'close': closes,
'volume': volumes
})
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
return df
class MEXCWebSocket:
"""MEXC-specific WebSocket implementation"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').upper()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.ping_interval = 20
self.last_ping_time = 0
self.message_count = 0
# MEXC WebSocket configuration
self.ws_url = "wss://wbs-api.mexc.com/ws"
self.ws_sub_params = [
f"spot@public.kline.v3.api@{self.symbol}@Min1"
]
self.subscribe_msgs = [
{
"method": "SUBSCRIPTION",
"params": self.ws_sub_params
}
]
logger.info(f"Initialized MEXC WebSocket for symbol: {self.symbol}")
logger.debug(f"Subscribe messages: {json.dumps(self.subscribe_msgs)}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
# Subscribe to the streams
for msg in self.subscribe_msgs:
logger.info(f"Sending subscription message: {json.dumps(msg)}")
await self.ws.send(json.dumps(msg))
# Wait for subscription confirmation
response = await self.ws.recv()
logger.info(f"Subscription response: {response}")
if "Not Subscribed" in response:
logger.error(f"Subscription error: {response}")
await self.unsubscribe()
await self.close()
return False
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to MEXC WebSocket for {self.symbol}")
# Start ping task
asyncio.create_task(self.ping_loop())
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await self.unsubscribe()
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def ping_loop(self):
"""Send ping messages to keep connection alive"""
while self.running:
try:
current_time = time.time()
if current_time - self.last_ping_time >= self.ping_interval:
ping_msg = {"method": "PING"}
logger.debug("Sending ping")
await self.ws.send(json.dumps(ping_msg))
self.last_ping_time = current_time
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error in ping loop: {str(e)}")
break
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 10 == 0:
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
if isinstance(message, bytes):
return None
data = json.loads(message)
# Handle PONG response
if isinstance(data, dict) and data.get('msg') == 'PONG':
logger.debug("Received pong")
return None
# Handle kline data
if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list):
kline = data['data'][0]
if len(kline) >= 6:
kline_data = {
'timestamp': int(kline[0]), # Timestamp
'open': float(kline[1]), # Open
'high': float(kline[2]), # High
'low': float(kline[3]), # Low
'price': float(kline[4]), # Close
'volume': float(kline[5]), # Volume
'type': 'kline'
}
logger.info(f"Processed kline data: {kline_data}")
return kline_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def unsubscribe(self):
"""Unsubscribe from all channels"""
if self.ws:
for msg in self.subscribe_msgs:
unsub_msg = {
"method": "UNSUBSCRIPTION",
"params": msg["params"]
}
try:
await self.ws.send(json.dumps(unsub_msg))
except:
pass
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.unsubscribe()
await self.ws.close()
self.running = False
logger.info("WebSocket connection closed")
class BinanceWebSocket:
"""Binance WebSocket implementation for real-time tick data"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').lower()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.message_count = 0
# Binance WebSocket configuration
self.ws_url = f"wss://stream.binance.com:9443/ws/{self.symbol}@trade"
logger.info(f"Initialized Binance WebSocket for symbol: {self.symbol}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to Binance WebSocket for {self.symbol}")
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 100 == 0: # Log every 100th message to avoid spam
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
data = json.loads(message)
# Process trade data
if 'e' in data and data['e'] == 'trade':
trade_data = {
'timestamp': data['T'], # Trade time
'price': float(data['p']), # Price
'volume': float(data['q']), # Quantity
'type': 'trade'
}
logger.debug(f"Processed trade data: {trade_data}")
return trade_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.ws.close()
self.running = False
logger.info("WebSocket connection closed")
class ExchangeWebSocket:
"""Generic WebSocket interface for cryptocurrency exchanges"""
def __init__(self, symbol: str, exchange: str = "binance"):
self.symbol = symbol
self.exchange = exchange.lower()
self.ws = None
# Initialize the appropriate WebSocket implementation
if self.exchange == "binance":
self.ws = BinanceWebSocket(symbol)
elif self.exchange == "mexc":
self.ws = MEXCWebSocket(symbol)
else:
raise ValueError(f"Unsupported exchange: {exchange}")
async def connect(self):
"""Connect to the exchange WebSocket"""
return await self.ws.connect()
async def receive(self) -> Optional[Dict]:
"""Receive data from the WebSocket"""
return await self.ws.receive()
async def close(self):
"""Close the WebSocket connection"""
await self.ws.close()
@property
def running(self):
"""Check if the WebSocket is running"""
return self.ws.running if self.ws else False
class RealTimeChart:
def __init__(self, symbol: str):
self.symbol = symbol
self.app = dash.Dash(__name__)
self.candlestick_data = CandlestickData()
self.tick_storage = TradeTickStorage(max_age_seconds=300) # Store 5 minutes of ticks
logger.info(f"Initializing RealTimeChart for {symbol}")
# Initialize the layout with improved styling
self.app.layout = html.Div([
html.H1(f"{symbol} Real-Time Price", style={
'textAlign': 'center',
'color': '#2c3e50',
'fontFamily': 'Arial, sans-serif',
'marginTop': '20px'
}),
html.Div([
html.Button('1s', id='btn-1s', n_clicks=0, style={'margin': '5px'}),
html.Button('5s', id='btn-5s', n_clicks=0, style={'margin': '5px'}),
html.Button('15s', id='btn-15s', n_clicks=0, style={'margin': '5px'}),
html.Button('30s', id='btn-30s', n_clicks=0, style={'margin': '5px'}),
html.Button('1m', id='btn-1m', n_clicks=0, style={'margin': '5px'}),
], style={'textAlign': 'center', 'margin': '10px'}),
dcc.Store(id='interval-store', data={'interval': 1}), # Store for current interval
dcc.Graph(
id='live-chart',
style={'height': '80vh'}
),
dcc.Interval(
id='interval-component',
interval=500, # Update every 500ms for smoother display
n_intervals=0
)
], style={
'backgroundColor': '#f8f9fa',
'padding': '20px'
})
# Callback to update interval based on button clicks
@self.app.callback(
Output('interval-store', 'data'),
[Input('btn-1s', 'n_clicks'),
Input('btn-5s', 'n_clicks'),
Input('btn-15s', 'n_clicks'),
Input('btn-30s', 'n_clicks'),
Input('btn-1m', 'n_clicks')],
[dash.dependencies.State('interval-store', 'data')]
)
def update_interval(n1, n5, n15, n30, n60, data):
ctx = dash.callback_context
if not ctx.triggered:
return data
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
if button_id == 'btn-1s':
return {'interval': 1}
elif button_id == 'btn-5s':
return {'interval': 5}
elif button_id == 'btn-15s':
return {'interval': 15}
elif button_id == 'btn-30s':
return {'interval': 30}
elif button_id == 'btn-1m':
return {'interval': 60}
return data
# Callback to update the chart
@self.app.callback(
Output('live-chart', 'figure'),
[Input('interval-component', 'n_intervals'),
Input('interval-store', 'data')]
)
def update_chart(n, interval_data):
try:
interval = interval_data.get('interval', 1)
logger.info(f"Updating chart for {self.symbol} with interval {interval}s")
fig = make_subplots(
rows=2, cols=1,
shared_xaxis=True,
vertical_spacing=0.03,
subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume'),
row_heights=[0.7, 0.3]
)
# Get candlesticks from tick storage
df = self.tick_storage.get_candles(interval_seconds=interval)
# Debug information about the dataframe
logger.info(f"Candles dataframe empty: {df.empty}, tick count: {len(self.tick_storage.ticks)}")
if not df.empty and len(df) > 0:
logger.info(f"Candles dataframe shape: {df.shape}")
logger.info(f"Candles dataframe columns: {df.columns.tolist()}")
logger.info(f"Candles dataframe first row: {df.iloc[0].to_dict() if len(df) > 0 else 'No rows'}")
# Add candlestick chart
fig.add_trace(
go.Candlestick(
x=df['timestamp'],
open=df['open'],
high=df['high'],
low=df['low'],
close=df['close'],
name='Price',
increasing_line_color='#33CC33', # Green
decreasing_line_color='#FF4136' # Red
),
row=1, col=1
)
# Add volume bars
colors = ['#33CC33' if close >= open else '#FF4136'
for close, open in zip(df['close'], df['open'])]
fig.add_trace(
go.Bar(
x=df['timestamp'],
y=df['volume'],
name='Volume',
marker_color=colors
),
row=2, col=1
)
# Add latest price line and annotation
latest_price = df['close'].iloc[-1]
fig.add_shape(
type="line",
x0=df['timestamp'].min(),
y0=latest_price,
x1=df['timestamp'].max(),
y1=latest_price,
line=dict(color="yellow", width=1, dash="dash"),
row=1, col=1
)
fig.add_annotation(
x=df['timestamp'].max(),
y=latest_price,
text=f"{latest_price:.2f}",
showarrow=False,
font=dict(size=14, color="yellow"),
xshift=50,
row=1, col=1
)
else:
# If no data, add a text annotation to the chart
logger.warning(f"No data to display for {self.symbol} - tick count: {len(self.tick_storage.ticks)}")
if self.tick_storage.ticks:
logger.info(f"Sample tick: {self.tick_storage.ticks[0]}")
# Add a message to the empty chart
fig.add_annotation(
x=0.5, y=0.5,
text=f"Waiting for {self.symbol} data...",
showarrow=False,
font=dict(size=20, color="white"),
xref="paper", yref="paper"
)
# Update layout with improved styling
fig.update_layout(
title_text=f"{self.symbol} Real-Time Data ({interval}s candles)",
title_x=0.5, # Center the title
xaxis_rangeslider_visible=False,
height=800,
template='plotly_dark',
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
font=dict(family="Arial, sans-serif", size=12, color="#2c3e50"),
showlegend=True,
legend=dict(
yanchor="top",
y=0.99,
xanchor="left",
x=0.01
)
)
# Update axes styling
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='rgba(128,128,128,0.2)')
return fig
except Exception as e:
logger.error(f"Error updating chart: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# Create a minimal figure with error message
fig = go.Figure()
fig.add_annotation(
x=0.5, y=0.5,
text=f"Error updating chart: {str(e)}",
showarrow=False,
font=dict(size=14, color="red"),
xref="paper", yref="paper"
)
fig.update_layout(
height=800,
template='plotly_dark',
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
return fig
async def start_websocket(self):
ws = ExchangeWebSocket(self.symbol)
while True: # Keep trying to maintain connection
if not await ws.connect():
logger.error(f"Failed to connect to exchange for {self.symbol}")
await asyncio.sleep(5)
continue
try:
logger.info(f"WebSocket connected for {self.symbol}, beginning data collection")
tick_count = 0
last_tick_count_log = time.time()
while True:
if not ws.running:
logger.warning("WebSocket not running, breaking loop")
break
data = await ws.receive()
if data:
if data.get('type') == 'kline':
# Use kline data directly for candlestick
trade_data = {
'timestamp': data['timestamp'],
'price': data['price'],
'volume': data['volume'],
'open': data['open'],
'high': data['high'],
'low': data['low']
}
logger.debug(f"Received kline data: {data}")
else:
# Use trade data
trade_data = {
'timestamp': data['timestamp'],
'price': data['price'],
'volume': data['volume']
}
# Store raw tick in the tick storage
self.tick_storage.add_tick(trade_data)
tick_count += 1
# Also update the old candlestick data for backward compatibility
self.candlestick_data.update_from_trade(trade_data)
# Log tick counts periodically
current_time = time.time()
if current_time - last_tick_count_log >= 10: # Log every 10 seconds
logger.info(f"{self.symbol}: Collected {tick_count} ticks in last {current_time - last_tick_count_log:.1f}s, total: {len(self.tick_storage.ticks)}")
last_tick_count_log = current_time
tick_count = 0
# Check if ticks are being converted to candles
if len(self.tick_storage.ticks) > 0:
sample_df = self.tick_storage.get_candles(interval_seconds=1)
logger.info(f"{self.symbol}: Sample candle count: {len(sample_df)}")
await asyncio.sleep(0.01)
except Exception as e:
logger.error(f"Error in WebSocket loop: {str(e)}")
finally:
await ws.close()
logger.info("Waiting 5 seconds before reconnecting...")
await asyncio.sleep(5)
def run(self, host='localhost', port=8050):
try:
logger.info(f"Starting Dash server for {self.symbol} on {host}:{port}")
self.app.run(debug=False, host=host, port=port)
except Exception as e:
logger.error(f"Error running Dash server on port {port}: {str(e)}")
# Try an alternative port if the primary one is in use
if "Address already in use" in str(e):
alt_port = port + 100
logger.warning(f"Port {port} is busy, trying alternative port {alt_port}")
try:
self.app.run(debug=False, host=host, port=alt_port)
except Exception as alt_e:
logger.error(f"Error running Dash server on alternative port {alt_port}: {str(alt_e)}")
else:
# Re-raise other exceptions
raise
async def main():
symbols = ["ETH/USDT", "BTC/USDT"]
logger.info(f"Starting application for symbols: {symbols}")
charts = []
websocket_tasks = []
# Create a chart and websocket task for each symbol
for symbol in symbols:
chart = RealTimeChart(symbol)
charts.append(chart)
websocket_tasks.append(asyncio.create_task(chart.start_websocket()))
# Run Dash in a separate thread to not block the event loop
server_threads = []
for i, chart in enumerate(charts):
port = 8050 + i # Use different ports for each chart
logger.info(f"Starting chart for {chart.symbol} on port {port}")
thread = Thread(target=lambda c=chart, p=port: c.run(port=p)) # Ensure correct port is passed
thread.daemon = True
thread.start()
server_threads.append(thread)
logger.info(f"Thread started for {chart.symbol} on port {port}")
try:
# Keep the main task running
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
finally:
for task in websocket_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application terminated by user")