314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""
|
|
Bybit Raw REST API Client
|
|
Implementation using direct HTTP calls with proper authentication
|
|
Based on Bybit API v5 documentation and official examples and https://github.com/bybit-exchange/api-connectors/blob/master/encryption_example/Encryption.py
|
|
"""
|
|
|
|
import hmac
|
|
import hashlib
|
|
import time
|
|
import json
|
|
import logging
|
|
import requests
|
|
from typing import Dict, Any, Optional
|
|
from urllib.parse import urlencode
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BybitRestClient:
|
|
"""Raw REST API client for Bybit with proper authentication and rate limiting."""
|
|
|
|
def __init__(self, api_key: str, api_secret: str, testnet: bool = False):
|
|
"""Initialize Bybit REST client.
|
|
|
|
Args:
|
|
api_key: Bybit API key
|
|
api_secret: Bybit API secret
|
|
testnet: If True, use testnet endpoints
|
|
"""
|
|
self.api_key = api_key
|
|
self.api_secret = api_secret
|
|
self.testnet = testnet
|
|
|
|
# API endpoints
|
|
if testnet:
|
|
self.base_url = "https://api-testnet.bybit.com"
|
|
else:
|
|
self.base_url = "https://api.bybit.com"
|
|
|
|
# Rate limiting
|
|
self.last_request_time = 0
|
|
self.min_request_interval = 0.1 # 100ms between requests
|
|
|
|
# Request session for connection pooling
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'gogo2-trading-bot/1.0',
|
|
'Content-Type': 'application/json'
|
|
})
|
|
|
|
logger.info(f"Initialized Bybit REST client (testnet: {testnet})")
|
|
|
|
def _generate_signature(self, timestamp: str, params: str) -> str:
|
|
"""Generate HMAC-SHA256 signature for Bybit API.
|
|
|
|
Args:
|
|
timestamp: Request timestamp
|
|
params: Query parameters or request body
|
|
|
|
Returns:
|
|
HMAC-SHA256 signature
|
|
"""
|
|
# Bybit signature format: timestamp + api_key + recv_window + params
|
|
recv_window = "5000" # 5 seconds
|
|
param_str = f"{timestamp}{self.api_key}{recv_window}{params}"
|
|
|
|
signature = hmac.new(
|
|
self.api_secret.encode('utf-8'),
|
|
param_str.encode('utf-8'),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
return signature
|
|
|
|
def _get_headers(self, timestamp: str, signature: str) -> Dict[str, str]:
|
|
"""Get request headers with authentication.
|
|
|
|
Args:
|
|
timestamp: Request timestamp
|
|
signature: HMAC signature
|
|
|
|
Returns:
|
|
Headers dictionary
|
|
"""
|
|
return {
|
|
'X-BAPI-API-KEY': self.api_key,
|
|
'X-BAPI-SIGN': signature,
|
|
'X-BAPI-TIMESTAMP': timestamp,
|
|
'X-BAPI-RECV-WINDOW': '5000',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
def _rate_limit(self):
|
|
"""Apply rate limiting between requests."""
|
|
current_time = time.time()
|
|
time_since_last = current_time - self.last_request_time
|
|
|
|
if time_since_last < self.min_request_interval:
|
|
sleep_time = self.min_request_interval - time_since_last
|
|
time.sleep(sleep_time)
|
|
|
|
self.last_request_time = time.time()
|
|
|
|
def _make_request(self, method: str, endpoint: str, params: Dict = None, signed: bool = False) -> Dict[str, Any]:
|
|
"""Make HTTP request to Bybit API.
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, etc.)
|
|
endpoint: API endpoint path
|
|
params: Request parameters
|
|
signed: Whether request requires authentication
|
|
|
|
Returns:
|
|
API response as dictionary
|
|
"""
|
|
self._rate_limit()
|
|
|
|
url = f"{self.base_url}{endpoint}"
|
|
timestamp = str(int(time.time() * 1000))
|
|
|
|
if params is None:
|
|
params = {}
|
|
|
|
headers = {'Content-Type': 'application/json'}
|
|
|
|
if signed:
|
|
if method == 'GET':
|
|
# For GET requests, params go in query string
|
|
query_string = urlencode(sorted(params.items()))
|
|
signature = self._generate_signature(timestamp, query_string)
|
|
headers.update(self._get_headers(timestamp, signature))
|
|
|
|
response = self.session.get(url, params=params, headers=headers)
|
|
else:
|
|
# For POST/PUT/DELETE, params go in body
|
|
body = json.dumps(params) if params else ""
|
|
signature = self._generate_signature(timestamp, body)
|
|
headers.update(self._get_headers(timestamp, signature))
|
|
|
|
response = self.session.request(method, url, data=body, headers=headers)
|
|
else:
|
|
# Public endpoint
|
|
if method == 'GET':
|
|
response = self.session.get(url, params=params, headers=headers)
|
|
else:
|
|
body = json.dumps(params) if params else ""
|
|
response = self.session.request(method, url, data=body, headers=headers)
|
|
|
|
# Log request details for debugging
|
|
logger.debug(f"{method} {url} - Status: {response.status_code}")
|
|
|
|
try:
|
|
result = response.json()
|
|
except json.JSONDecodeError:
|
|
logger.error(f"Failed to decode JSON response: {response.text}")
|
|
raise Exception(f"Invalid JSON response: {response.text}")
|
|
|
|
# Check for API errors
|
|
if response.status_code != 200:
|
|
error_msg = result.get('retMsg', f'HTTP {response.status_code}')
|
|
logger.error(f"API Error: {error_msg}")
|
|
raise Exception(f"Bybit API Error: {error_msg}")
|
|
|
|
if result.get('retCode') != 0:
|
|
error_msg = result.get('retMsg', 'Unknown error')
|
|
error_code = result.get('retCode', 'Unknown')
|
|
logger.error(f"Bybit Error {error_code}: {error_msg}")
|
|
raise Exception(f"Bybit Error {error_code}: {error_msg}")
|
|
|
|
return result
|
|
|
|
def get_server_time(self) -> Dict[str, Any]:
|
|
"""Get server time (public endpoint)."""
|
|
return self._make_request('GET', '/v5/market/time')
|
|
|
|
def get_account_info(self) -> Dict[str, Any]:
|
|
"""Get account information (private endpoint)."""
|
|
return self._make_request('GET', '/v5/account/wallet-balance',
|
|
{'accountType': 'UNIFIED'}, signed=True)
|
|
|
|
def get_ticker(self, symbol: str, category: str = "linear") -> Dict[str, Any]:
|
|
"""Get ticker information.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., BTCUSDT)
|
|
category: Product category (linear, inverse, spot, option)
|
|
"""
|
|
params = {'category': category, 'symbol': symbol}
|
|
return self._make_request('GET', '/v5/market/tickers', params)
|
|
|
|
def get_orderbook(self, symbol: str, category: str = "linear", limit: int = 25) -> Dict[str, Any]:
|
|
"""Get orderbook data.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
category: Product category
|
|
limit: Number of price levels (max 200)
|
|
"""
|
|
params = {'category': category, 'symbol': symbol, 'limit': min(limit, 200)}
|
|
return self._make_request('GET', '/v5/market/orderbook', params)
|
|
|
|
def get_positions(self, category: str = "linear", symbol: str = None) -> Dict[str, Any]:
|
|
"""Get position information.
|
|
|
|
Args:
|
|
category: Product category
|
|
symbol: Trading symbol (optional)
|
|
"""
|
|
params = {'category': category}
|
|
if symbol:
|
|
params['symbol'] = symbol
|
|
return self._make_request('GET', '/v5/position/list', params, signed=True)
|
|
|
|
def get_open_orders(self, category: str = "linear", symbol: str = None) -> Dict[str, Any]:
|
|
"""Get open orders with caching.
|
|
|
|
Args:
|
|
category: Product category
|
|
symbol: Trading symbol (optional)
|
|
"""
|
|
params = {'category': category, 'openOnly': True}
|
|
if symbol:
|
|
params['symbol'] = symbol
|
|
return self._make_request('GET', '/v5/order/realtime', params, signed=True)
|
|
|
|
def place_order(self, category: str, symbol: str, side: str, order_type: str,
|
|
qty: str, price: str = None, **kwargs) -> Dict[str, Any]:
|
|
"""Place an order.
|
|
|
|
Args:
|
|
category: Product category (linear, inverse, spot, option)
|
|
symbol: Trading symbol
|
|
side: Buy or Sell
|
|
order_type: Market, Limit, etc.
|
|
qty: Order quantity as string
|
|
price: Order price as string (for limit orders)
|
|
**kwargs: Additional order parameters
|
|
"""
|
|
params = {
|
|
'category': category,
|
|
'symbol': symbol,
|
|
'side': side,
|
|
'orderType': order_type,
|
|
'qty': qty
|
|
}
|
|
|
|
if price:
|
|
params['price'] = price
|
|
|
|
# Add additional parameters
|
|
params.update(kwargs)
|
|
|
|
return self._make_request('POST', '/v5/order/create', params, signed=True)
|
|
|
|
def cancel_order(self, category: str, symbol: str, order_id: str = None,
|
|
order_link_id: str = None) -> Dict[str, Any]:
|
|
"""Cancel an order.
|
|
|
|
Args:
|
|
category: Product category
|
|
symbol: Trading symbol
|
|
order_id: Order ID
|
|
order_link_id: Order link ID (alternative to order_id)
|
|
"""
|
|
params = {'category': category, 'symbol': symbol}
|
|
|
|
if order_id:
|
|
params['orderId'] = order_id
|
|
elif order_link_id:
|
|
params['orderLinkId'] = order_link_id
|
|
else:
|
|
raise ValueError("Either order_id or order_link_id must be provided")
|
|
|
|
return self._make_request('POST', '/v5/order/cancel', params, signed=True)
|
|
|
|
def get_instruments_info(self, category: str = "linear", symbol: str = None) -> Dict[str, Any]:
|
|
"""Get instruments information.
|
|
|
|
Args:
|
|
category: Product category
|
|
symbol: Trading symbol (optional)
|
|
"""
|
|
params = {'category': category}
|
|
if symbol:
|
|
params['symbol'] = symbol
|
|
return self._make_request('GET', '/v5/market/instruments-info', params)
|
|
|
|
def test_connectivity(self) -> bool:
|
|
"""Test API connectivity.
|
|
|
|
Returns:
|
|
True if connected successfully
|
|
"""
|
|
try:
|
|
result = self.get_server_time()
|
|
logger.info("✅ Bybit REST API connectivity test successful")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"❌ Bybit REST API connectivity test failed: {e}")
|
|
return False
|
|
|
|
def test_authentication(self) -> bool:
|
|
"""Test API authentication.
|
|
|
|
Returns:
|
|
True if authentication successful
|
|
"""
|
|
try:
|
|
result = self.get_account_info()
|
|
logger.info("✅ Bybit REST API authentication test successful")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"❌ Bybit REST API authentication test failed: {e}")
|
|
return False |