""" Bybit Raw REST API Client Implementation using direct HTTP calls with proper authentication Based on Bybit API v5 documentation and official examples and https://github.com/bybit-exchange/api-connectors/blob/master/encryption_example/Encryption.py """ import hmac import hashlib import time import json import logging import requests from typing import Dict, Any, Optional from urllib.parse import urlencode logger = logging.getLogger(__name__) class BybitRestClient: """Raw REST API client for Bybit with proper authentication and rate limiting.""" def __init__(self, api_key: str, api_secret: str, testnet: bool = False): """Initialize Bybit REST client. Args: api_key: Bybit API key api_secret: Bybit API secret testnet: If True, use testnet endpoints """ self.api_key = api_key self.api_secret = api_secret self.testnet = testnet # API endpoints if testnet: self.base_url = "https://api-testnet.bybit.com" else: self.base_url = "https://api.bybit.com" # Rate limiting self.last_request_time = 0 self.min_request_interval = 0.1 # 100ms between requests # Request session for connection pooling self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'gogo2-trading-bot/1.0', 'Content-Type': 'application/json' }) logger.info(f"Initialized Bybit REST client (testnet: {testnet})") def _generate_signature(self, timestamp: str, params: str) -> str: """Generate HMAC-SHA256 signature for Bybit API. Args: timestamp: Request timestamp params: Query parameters or request body Returns: HMAC-SHA256 signature """ # Bybit signature format: timestamp + api_key + recv_window + params recv_window = "5000" # 5 seconds param_str = f"{timestamp}{self.api_key}{recv_window}{params}" signature = hmac.new( self.api_secret.encode('utf-8'), param_str.encode('utf-8'), hashlib.sha256 ).hexdigest() return signature def _get_headers(self, timestamp: str, signature: str) -> Dict[str, str]: """Get request headers with authentication. Args: timestamp: Request timestamp signature: HMAC signature Returns: Headers dictionary """ return { 'X-BAPI-API-KEY': self.api_key, 'X-BAPI-SIGN': signature, 'X-BAPI-TIMESTAMP': timestamp, 'X-BAPI-RECV-WINDOW': '5000', 'Content-Type': 'application/json' } def _rate_limit(self): """Apply rate limiting between requests.""" current_time = time.time() time_since_last = current_time - self.last_request_time if time_since_last < self.min_request_interval: sleep_time = self.min_request_interval - time_since_last time.sleep(sleep_time) self.last_request_time = time.time() def _make_request(self, method: str, endpoint: str, params: Dict = None, signed: bool = False) -> Dict[str, Any]: """Make HTTP request to Bybit API. Args: method: HTTP method (GET, POST, etc.) endpoint: API endpoint path params: Request parameters signed: Whether request requires authentication Returns: API response as dictionary """ self._rate_limit() url = f"{self.base_url}{endpoint}" timestamp = str(int(time.time() * 1000)) if params is None: params = {} headers = {'Content-Type': 'application/json'} if signed: if method == 'GET': # For GET requests, params go in query string query_string = urlencode(sorted(params.items())) signature = self._generate_signature(timestamp, query_string) headers.update(self._get_headers(timestamp, signature)) response = self.session.get(url, params=params, headers=headers) else: # For POST/PUT/DELETE, params go in body body = json.dumps(params) if params else "" signature = self._generate_signature(timestamp, body) headers.update(self._get_headers(timestamp, signature)) response = self.session.request(method, url, data=body, headers=headers) else: # Public endpoint if method == 'GET': response = self.session.get(url, params=params, headers=headers) else: body = json.dumps(params) if params else "" response = self.session.request(method, url, data=body, headers=headers) # Log request details for debugging logger.debug(f"{method} {url} - Status: {response.status_code}") try: result = response.json() except json.JSONDecodeError: logger.error(f"Failed to decode JSON response: {response.text}") raise Exception(f"Invalid JSON response: {response.text}") # Check for API errors if response.status_code != 200: error_msg = result.get('retMsg', f'HTTP {response.status_code}') logger.error(f"API Error: {error_msg}") raise Exception(f"Bybit API Error: {error_msg}") if result.get('retCode') != 0: error_msg = result.get('retMsg', 'Unknown error') error_code = result.get('retCode', 'Unknown') logger.error(f"Bybit Error {error_code}: {error_msg}") raise Exception(f"Bybit Error {error_code}: {error_msg}") return result def get_server_time(self) -> Dict[str, Any]: """Get server time (public endpoint).""" return self._make_request('GET', '/v5/market/time') def get_account_info(self) -> Dict[str, Any]: """Get account information (private endpoint).""" return self._make_request('GET', '/v5/account/wallet-balance', {'accountType': 'UNIFIED'}, signed=True) def get_ticker(self, symbol: str, category: str = "linear") -> Dict[str, Any]: """Get ticker information. Args: symbol: Trading symbol (e.g., BTCUSDT) category: Product category (linear, inverse, spot, option) """ params = {'category': category, 'symbol': symbol} return self._make_request('GET', '/v5/market/tickers', params) def get_orderbook(self, symbol: str, category: str = "linear", limit: int = 25) -> Dict[str, Any]: """Get orderbook data. Args: symbol: Trading symbol category: Product category limit: Number of price levels (max 200) """ params = {'category': category, 'symbol': symbol, 'limit': min(limit, 200)} return self._make_request('GET', '/v5/market/orderbook', params) def get_positions(self, category: str = "linear", symbol: str = None) -> Dict[str, Any]: """Get position information. Args: category: Product category symbol: Trading symbol (optional) """ params = {'category': category} if symbol: params['symbol'] = symbol return self._make_request('GET', '/v5/position/list', params, signed=True) def get_open_orders(self, category: str = "linear", symbol: str = None) -> Dict[str, Any]: """Get open orders with caching. Args: category: Product category symbol: Trading symbol (optional) """ params = {'category': category, 'openOnly': True} if symbol: params['symbol'] = symbol return self._make_request('GET', '/v5/order/realtime', params, signed=True) def place_order(self, category: str, symbol: str, side: str, order_type: str, qty: str, price: str = None, **kwargs) -> Dict[str, Any]: """Place an order. Args: category: Product category (linear, inverse, spot, option) symbol: Trading symbol side: Buy or Sell order_type: Market, Limit, etc. qty: Order quantity as string price: Order price as string (for limit orders) **kwargs: Additional order parameters """ params = { 'category': category, 'symbol': symbol, 'side': side, 'orderType': order_type, 'qty': qty } if price: params['price'] = price # Add additional parameters params.update(kwargs) return self._make_request('POST', '/v5/order/create', params, signed=True) def cancel_order(self, category: str, symbol: str, order_id: str = None, order_link_id: str = None) -> Dict[str, Any]: """Cancel an order. Args: category: Product category symbol: Trading symbol order_id: Order ID order_link_id: Order link ID (alternative to order_id) """ params = {'category': category, 'symbol': symbol} if order_id: params['orderId'] = order_id elif order_link_id: params['orderLinkId'] = order_link_id else: raise ValueError("Either order_id or order_link_id must be provided") return self._make_request('POST', '/v5/order/cancel', params, signed=True) def get_instruments_info(self, category: str = "linear", symbol: str = None) -> Dict[str, Any]: """Get instruments information. Args: category: Product category symbol: Trading symbol (optional) """ params = {'category': category} if symbol: params['symbol'] = symbol return self._make_request('GET', '/v5/market/instruments-info', params) def test_connectivity(self) -> bool: """Test API connectivity. Returns: True if connected successfully """ try: result = self.get_server_time() logger.info("✅ Bybit REST API connectivity test successful") return True except Exception as e: logger.error(f"❌ Bybit REST API connectivity test failed: {e}") return False def test_authentication(self) -> bool: """Test API authentication. Returns: True if authentication successful """ try: result = self.get_account_info() logger.info("✅ Bybit REST API authentication test successful") return True except Exception as e: logger.error(f"❌ Bybit REST API authentication test failed: {e}") return False