This commit is contained in:
Dobromir Popov 2025-03-19 00:47:27 +02:00
parent 09ff398fef
commit 23f84dd2f8

View File

@ -375,153 +375,6 @@ class CandlestickData:
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
return df
class MEXCWebSocket:
"""MEXC-specific WebSocket implementation"""
def __init__(self, symbol: str):
self.symbol = symbol.replace('/', '').upper()
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
self.ping_interval = 20
self.last_ping_time = 0
self.message_count = 0
# MEXC WebSocket configuration
self.ws_url = "wss://wbs-api.mexc.com/ws"
self.ws_sub_params = [
f"spot@public.kline.v3.api@{self.symbol}@Min1"
]
self.subscribe_msgs = [
{
"method": "SUBSCRIPTION",
"params": self.ws_sub_params
}
]
logger.info(f"Initialized MEXC WebSocket for symbol: {self.symbol}")
logger.debug(f"Subscribe messages: {json.dumps(self.subscribe_msgs)}")
async def connect(self):
while True:
try:
logger.info(f"Attempting to connect to {self.ws_url}")
self.ws = await websockets.connect(self.ws_url)
logger.info("WebSocket connection established")
# Subscribe to the streams
for msg in self.subscribe_msgs:
logger.info(f"Sending subscription message: {json.dumps(msg)}")
await self.ws.send(json.dumps(msg))
# Wait for subscription confirmation
response = await self.ws.recv()
logger.info(f"Subscription response: {response}")
if "Not Subscribed" in response:
logger.error(f"Subscription error: {response}")
await self.unsubscribe()
await self.close()
return False
self.running = True
self.reconnect_delay = 1
logger.info(f"Successfully connected to MEXC WebSocket for {self.symbol}")
# Start ping task
asyncio.create_task(self.ping_loop())
return True
except Exception as e:
logger.error(f"WebSocket connection error: {str(e)}")
await self.unsubscribe()
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
continue
async def ping_loop(self):
"""Send ping messages to keep connection alive"""
while self.running:
try:
current_time = time.time()
if current_time - self.last_ping_time >= self.ping_interval:
ping_msg = {"method": "PING"}
logger.debug("Sending ping")
await self.ws.send(json.dumps(ping_msg))
self.last_ping_time = current_time
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error in ping loop: {str(e)}")
break
async def receive(self) -> Optional[Dict]:
if not self.ws:
return None
try:
message = await self.ws.recv()
self.message_count += 1
if self.message_count % 10 == 0:
logger.info(f"Received message #{self.message_count}")
logger.debug(f"Raw message: {message[:200]}...")
if isinstance(message, bytes):
return None
data = json.loads(message)
# Handle PONG response
if isinstance(data, dict) and data.get('msg') == 'PONG':
logger.debug("Received pong")
return None
# Handle kline data
if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list):
kline = data['data'][0]
if len(kline) >= 6:
kline_data = {
'timestamp': int(kline[0]), # Timestamp
'open': float(kline[1]), # Open
'high': float(kline[2]), # High
'low': float(kline[3]), # Low
'price': float(kline[4]), # Close
'volume': float(kline[5]), # Volume
'type': 'kline'
}
logger.info(f"Processed kline data: {kline_data}")
return kline_data
return None
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
self.running = False
return None
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...")
return None
except Exception as e:
logger.error(f"Error receiving message: {str(e)}")
return None
async def unsubscribe(self):
"""Unsubscribe from all channels"""
if self.ws:
for msg in self.subscribe_msgs:
unsub_msg = {
"method": "UNSUBSCRIPTION",
"params": msg["params"]
}
try:
await self.ws.send(json.dumps(unsub_msg))
except:
pass
async def close(self):
"""Close the WebSocket connection"""
if self.ws:
await self.unsubscribe()
await self.ws.close()
self.running = False
logger.info("WebSocket connection closed")
class BinanceWebSocket:
"""Binance WebSocket implementation for real-time tick data"""
def __init__(self, symbol: str):
@ -597,6 +450,148 @@ class BinanceWebSocket:
self.running = False
logger.info("WebSocket connection closed")
class BinanceHistoricalData:
"""Fetch historical candle data from Binance"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3/klines"
# Create a cache directory if it doesn't exist
self.cache_dir = os.path.join(os.getcwd(), "cache")
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}")
def _get_interval_string(self, interval_seconds: int) -> str:
"""Convert interval seconds to Binance interval string"""
if interval_seconds == 60: # 1m
return "1m"
elif interval_seconds == 3600: # 1h
return "1h"
elif interval_seconds == 86400: # 1d
return "1d"
else:
# Default to 1m if not recognized
logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m")
return "1m"
def _get_cache_filename(self, symbol: str, interval: str) -> str:
"""Generate cache filename for the symbol and interval"""
# Replace any slashes in symbol with underscore
safe_symbol = symbol.replace("/", "_")
return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]:
"""Load candle data from cache if available and not expired"""
filename = self._get_cache_filename(symbol, interval)
if not os.path.exists(filename):
logger.debug(f"No cache file found for {symbol} {interval}")
return None
# Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d)
file_age = time.time() - os.path.getmtime(filename)
max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others
if file_age > max_age:
logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)")
return None
try:
df = pd.read_csv(filename)
# Convert timestamp string back to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error loading from cache: {str(e)}")
return None
def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool:
"""Save candle data to cache"""
if df.empty:
logger.warning(f"No data to cache for {symbol} {interval}")
return False
filename = self._get_cache_filename(symbol, interval)
try:
df.to_csv(filename, index=False)
logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame:
"""Get historical candle data for the specified symbol and interval"""
# Convert to Binance format
clean_symbol = symbol.replace("/", "")
interval = self._get_interval_string(interval_seconds)
# Try to load from cache first
cached_data = self._load_from_cache(symbol, interval)
if cached_data is not None and len(cached_data) >= limit:
return cached_data.tail(limit)
# Fetch from API if not cached or insufficient
try:
logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API")
params = {
"symbol": clean_symbol,
"interval": interval,
"limit": limit
}
response = requests.get(self.base_url, params=params)
response.raise_for_status() # Raise exception for HTTP errors
# Process the data
candles = response.json()
if not candles:
logger.warning(f"No candles returned from Binance for {symbol} {interval}")
return pd.DataFrame()
# Convert to DataFrame - Binance returns data in this format:
# [
# [
# 1499040000000, // Open time
# "0.01634790", // Open
# "0.80000000", // High
# "0.01575800", // Low
# "0.01577100", // Close
# "148976.11427815", // Volume
# ... // Ignore the rest
# ],
# ...
# ]
df = pd.DataFrame(candles, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert types
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Keep only needed columns
df = df[["timestamp", "open", "high", "low", "close", "volume"]]
# Cache the results
self._save_to_cache(df, symbol, interval)
logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
class ExchangeWebSocket:
"""Generic WebSocket interface for cryptocurrency exchanges"""
def __init__(self, symbol: str, exchange: str = "binance"):
@ -1899,146 +1894,6 @@ class RealTimeChart:
import traceback
logger.error(traceback.format_exc())
class BinanceHistoricalData:
"""Fetch historical candle data from Binance"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3/klines"
# Create a cache directory if it doesn't exist
self.cache_dir = os.path.join(os.getcwd(), "cache")
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}")
def _get_interval_string(self, interval_seconds: int) -> str:
"""Convert interval seconds to Binance interval string"""
if interval_seconds == 60: # 1m
return "1m"
elif interval_seconds == 3600: # 1h
return "1h"
elif interval_seconds == 86400: # 1d
return "1d"
else:
# Default to 1m if not recognized
logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m")
return "1m"
def _get_cache_filename(self, symbol: str, interval: str) -> str:
"""Generate cache filename for the symbol and interval"""
# Replace any slashes in symbol with underscore
safe_symbol = symbol.replace("/", "_")
return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]:
"""Load candle data from cache if available and not expired"""
filename = self._get_cache_filename(symbol, interval)
if not os.path.exists(filename):
logger.debug(f"No cache file found for {symbol} {interval}")
return None
# Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d)
file_age = time.time() - os.path.getmtime(filename)
max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others
if file_age > max_age:
logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)")
return None
try:
df = pd.read_csv(filename)
# Convert timestamp string back to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error loading from cache: {str(e)}")
return None
def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool:
"""Save candle data to cache"""
if df.empty:
logger.warning(f"No data to cache for {symbol} {interval}")
return False
filename = self._get_cache_filename(symbol, interval)
try:
df.to_csv(filename, index=False)
logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame:
"""Get historical candle data for the specified symbol and interval"""
# Convert to Binance format
clean_symbol = symbol.replace("/", "")
interval = self._get_interval_string(interval_seconds)
# Try to load from cache first
cached_data = self._load_from_cache(symbol, interval)
if cached_data is not None and len(cached_data) >= limit:
return cached_data.tail(limit)
# Fetch from API if not cached or insufficient
try:
logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API")
params = {
"symbol": clean_symbol,
"interval": interval,
"limit": limit
}
response = requests.get(self.base_url, params=params)
response.raise_for_status() # Raise exception for HTTP errors
# Process the data
candles = response.json()
if not candles:
logger.warning(f"No candles returned from Binance for {symbol} {interval}")
return pd.DataFrame()
# Convert to DataFrame - Binance returns data in this format:
# [
# [
# 1499040000000, // Open time
# "0.01634790", // Open
# "0.80000000", // High
# "0.01575800", // Low
# "0.01577100", // Close
# "148976.11427815", // Volume
# ... // Ignore the rest
# ],
# ...
# ]
df = pd.DataFrame(candles, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert types
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Keep only needed columns
df = df[["timestamp", "open", "high", "low", "close", "volume"]]
# Cache the results
self._save_to_cache(df, symbol, interval)
logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
async def main():
symbols = ["ETH/USDT", "BTC/USDT"]