diff --git a/realtime.py b/realtime.py index 6aa908c..cf63db4 100644 --- a/realtime.py +++ b/realtime.py @@ -375,153 +375,6 @@ class CandlestickData: df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') return df -class MEXCWebSocket: - """MEXC-specific WebSocket implementation""" - def __init__(self, symbol: str): - self.symbol = symbol.replace('/', '').upper() - self.ws = None - self.running = False - self.reconnect_delay = 1 - self.max_reconnect_delay = 60 - self.ping_interval = 20 - self.last_ping_time = 0 - self.message_count = 0 - - # MEXC WebSocket configuration - self.ws_url = "wss://wbs-api.mexc.com/ws" - self.ws_sub_params = [ - f"spot@public.kline.v3.api@{self.symbol}@Min1" - ] - self.subscribe_msgs = [ - { - "method": "SUBSCRIPTION", - "params": self.ws_sub_params - } - ] - logger.info(f"Initialized MEXC WebSocket for symbol: {self.symbol}") - logger.debug(f"Subscribe messages: {json.dumps(self.subscribe_msgs)}") - - async def connect(self): - while True: - try: - logger.info(f"Attempting to connect to {self.ws_url}") - self.ws = await websockets.connect(self.ws_url) - logger.info("WebSocket connection established") - - # Subscribe to the streams - for msg in self.subscribe_msgs: - logger.info(f"Sending subscription message: {json.dumps(msg)}") - await self.ws.send(json.dumps(msg)) - - # Wait for subscription confirmation - response = await self.ws.recv() - logger.info(f"Subscription response: {response}") - if "Not Subscribed" in response: - logger.error(f"Subscription error: {response}") - await self.unsubscribe() - await self.close() - return False - - self.running = True - self.reconnect_delay = 1 - logger.info(f"Successfully connected to MEXC WebSocket for {self.symbol}") - - # Start ping task - asyncio.create_task(self.ping_loop()) - return True - except Exception as e: - logger.error(f"WebSocket connection error: {str(e)}") - await self.unsubscribe() - await asyncio.sleep(self.reconnect_delay) - self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay) - continue - - async def ping_loop(self): - """Send ping messages to keep connection alive""" - while self.running: - try: - current_time = time.time() - if current_time - self.last_ping_time >= self.ping_interval: - ping_msg = {"method": "PING"} - logger.debug("Sending ping") - await self.ws.send(json.dumps(ping_msg)) - self.last_ping_time = current_time - await asyncio.sleep(1) - except Exception as e: - logger.error(f"Error in ping loop: {str(e)}") - break - - async def receive(self) -> Optional[Dict]: - if not self.ws: - return None - - try: - message = await self.ws.recv() - self.message_count += 1 - - if self.message_count % 10 == 0: - logger.info(f"Received message #{self.message_count}") - logger.debug(f"Raw message: {message[:200]}...") - - if isinstance(message, bytes): - return None - - data = json.loads(message) - - # Handle PONG response - if isinstance(data, dict) and data.get('msg') == 'PONG': - logger.debug("Received pong") - return None - - # Handle kline data - if isinstance(data, dict) and 'data' in data and isinstance(data['data'], list): - kline = data['data'][0] - if len(kline) >= 6: - kline_data = { - 'timestamp': int(kline[0]), # Timestamp - 'open': float(kline[1]), # Open - 'high': float(kline[2]), # High - 'low': float(kline[3]), # Low - 'price': float(kline[4]), # Close - 'volume': float(kline[5]), # Volume - 'type': 'kline' - } - logger.info(f"Processed kline data: {kline_data}") - return kline_data - - return None - except websockets.exceptions.ConnectionClosed: - logger.warning("WebSocket connection closed") - self.running = False - return None - except json.JSONDecodeError as e: - logger.error(f"JSON decode error: {str(e)}, message: {message[:200]}...") - return None - except Exception as e: - logger.error(f"Error receiving message: {str(e)}") - return None - - async def unsubscribe(self): - """Unsubscribe from all channels""" - if self.ws: - for msg in self.subscribe_msgs: - unsub_msg = { - "method": "UNSUBSCRIPTION", - "params": msg["params"] - } - try: - await self.ws.send(json.dumps(unsub_msg)) - except: - pass - - async def close(self): - """Close the WebSocket connection""" - if self.ws: - await self.unsubscribe() - await self.ws.close() - self.running = False - logger.info("WebSocket connection closed") - class BinanceWebSocket: """Binance WebSocket implementation for real-time tick data""" def __init__(self, symbol: str): @@ -597,6 +450,148 @@ class BinanceWebSocket: self.running = False logger.info("WebSocket connection closed") +class BinanceHistoricalData: + """Fetch historical candle data from Binance""" + + def __init__(self): + self.base_url = "https://api.binance.com/api/v3/klines" + # Create a cache directory if it doesn't exist + self.cache_dir = os.path.join(os.getcwd(), "cache") + os.makedirs(self.cache_dir, exist_ok=True) + logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}") + + def _get_interval_string(self, interval_seconds: int) -> str: + """Convert interval seconds to Binance interval string""" + if interval_seconds == 60: # 1m + return "1m" + elif interval_seconds == 3600: # 1h + return "1h" + elif interval_seconds == 86400: # 1d + return "1d" + else: + # Default to 1m if not recognized + logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m") + return "1m" + + def _get_cache_filename(self, symbol: str, interval: str) -> str: + """Generate cache filename for the symbol and interval""" + # Replace any slashes in symbol with underscore + safe_symbol = symbol.replace("/", "_") + return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv") + + def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]: + """Load candle data from cache if available and not expired""" + filename = self._get_cache_filename(symbol, interval) + + if not os.path.exists(filename): + logger.debug(f"No cache file found for {symbol} {interval}") + return None + + # Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d) + file_age = time.time() - os.path.getmtime(filename) + max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others + + if file_age > max_age: + logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)") + return None + + try: + df = pd.read_csv(filename) + # Convert timestamp string back to datetime + df['timestamp'] = pd.to_datetime(df['timestamp']) + logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}") + return df + except Exception as e: + logger.error(f"Error loading from cache: {str(e)}") + return None + + def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool: + """Save candle data to cache""" + if df.empty: + logger.warning(f"No data to cache for {symbol} {interval}") + return False + + filename = self._get_cache_filename(symbol, interval) + try: + df.to_csv(filename, index=False) + logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}") + return True + except Exception as e: + logger.error(f"Error saving to cache: {str(e)}") + return False + + def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame: + """Get historical candle data for the specified symbol and interval""" + # Convert to Binance format + clean_symbol = symbol.replace("/", "") + interval = self._get_interval_string(interval_seconds) + + # Try to load from cache first + cached_data = self._load_from_cache(symbol, interval) + if cached_data is not None and len(cached_data) >= limit: + return cached_data.tail(limit) + + # Fetch from API if not cached or insufficient + try: + logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API") + + params = { + "symbol": clean_symbol, + "interval": interval, + "limit": limit + } + + response = requests.get(self.base_url, params=params) + response.raise_for_status() # Raise exception for HTTP errors + + # Process the data + candles = response.json() + + if not candles: + logger.warning(f"No candles returned from Binance for {symbol} {interval}") + return pd.DataFrame() + + # Convert to DataFrame - Binance returns data in this format: + # [ + # [ + # 1499040000000, // Open time + # "0.01634790", // Open + # "0.80000000", // High + # "0.01575800", // Low + # "0.01577100", // Close + # "148976.11427815", // Volume + # ... // Ignore the rest + # ], + # ... + # ] + + df = pd.DataFrame(candles, columns=[ + "timestamp", "open", "high", "low", "close", "volume", + "close_time", "quote_asset_volume", "number_of_trades", + "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" + ]) + + # Convert types + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') + for col in ["open", "high", "low", "close", "volume"]: + df[col] = df[col].astype(float) + + # Keep only needed columns + df = df[["timestamp", "open", "high", "low", "close", "volume"]] + + # Cache the results + self._save_to_cache(df, symbol, interval) + + logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}") + return df + + except Exception as e: + logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}") + import traceback + logger.error(traceback.format_exc()) + return pd.DataFrame() + + class ExchangeWebSocket: """Generic WebSocket interface for cryptocurrency exchanges""" def __init__(self, symbol: str, exchange: str = "binance"): @@ -1899,146 +1894,6 @@ class RealTimeChart: import traceback logger.error(traceback.format_exc()) -class BinanceHistoricalData: - """Fetch historical candle data from Binance""" - - def __init__(self): - self.base_url = "https://api.binance.com/api/v3/klines" - # Create a cache directory if it doesn't exist - self.cache_dir = os.path.join(os.getcwd(), "cache") - os.makedirs(self.cache_dir, exist_ok=True) - logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}") - - def _get_interval_string(self, interval_seconds: int) -> str: - """Convert interval seconds to Binance interval string""" - if interval_seconds == 60: # 1m - return "1m" - elif interval_seconds == 3600: # 1h - return "1h" - elif interval_seconds == 86400: # 1d - return "1d" - else: - # Default to 1m if not recognized - logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m") - return "1m" - - def _get_cache_filename(self, symbol: str, interval: str) -> str: - """Generate cache filename for the symbol and interval""" - # Replace any slashes in symbol with underscore - safe_symbol = symbol.replace("/", "_") - return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv") - - def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]: - """Load candle data from cache if available and not expired""" - filename = self._get_cache_filename(symbol, interval) - - if not os.path.exists(filename): - logger.debug(f"No cache file found for {symbol} {interval}") - return None - - # Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d) - file_age = time.time() - os.path.getmtime(filename) - max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others - - if file_age > max_age: - logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)") - return None - - try: - df = pd.read_csv(filename) - # Convert timestamp string back to datetime - df['timestamp'] = pd.to_datetime(df['timestamp']) - logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}") - return df - except Exception as e: - logger.error(f"Error loading from cache: {str(e)}") - return None - - def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool: - """Save candle data to cache""" - if df.empty: - logger.warning(f"No data to cache for {symbol} {interval}") - return False - - filename = self._get_cache_filename(symbol, interval) - try: - df.to_csv(filename, index=False) - logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}") - return True - except Exception as e: - logger.error(f"Error saving to cache: {str(e)}") - return False - - def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame: - """Get historical candle data for the specified symbol and interval""" - # Convert to Binance format - clean_symbol = symbol.replace("/", "") - interval = self._get_interval_string(interval_seconds) - - # Try to load from cache first - cached_data = self._load_from_cache(symbol, interval) - if cached_data is not None and len(cached_data) >= limit: - return cached_data.tail(limit) - - # Fetch from API if not cached or insufficient - try: - logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API") - - params = { - "symbol": clean_symbol, - "interval": interval, - "limit": limit - } - - response = requests.get(self.base_url, params=params) - response.raise_for_status() # Raise exception for HTTP errors - - # Process the data - candles = response.json() - - if not candles: - logger.warning(f"No candles returned from Binance for {symbol} {interval}") - return pd.DataFrame() - - # Convert to DataFrame - Binance returns data in this format: - # [ - # [ - # 1499040000000, // Open time - # "0.01634790", // Open - # "0.80000000", // High - # "0.01575800", // Low - # "0.01577100", // Close - # "148976.11427815", // Volume - # ... // Ignore the rest - # ], - # ... - # ] - - df = pd.DataFrame(candles, columns=[ - "timestamp", "open", "high", "low", "close", "volume", - "close_time", "quote_asset_volume", "number_of_trades", - "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore" - ]) - - # Convert types - df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') - for col in ["open", "high", "low", "close", "volume"]: - df[col] = df[col].astype(float) - - # Keep only needed columns - df = df[["timestamp", "open", "high", "low", "close", "volume"]] - - # Cache the results - self._save_to_cache(df, symbol, interval) - - logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}") - return df - - except Exception as e: - logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}") - import traceback - logger.error(traceback.format_exc()) - return pd.DataFrame() async def main(): symbols = ["ETH/USDT", "BTC/USDT"]