Files
gogo2/NN/exchanges/bybit_interface.py
Dobromir Popov 02804ee64f bybit REST api
2025-07-14 22:57:02 +03:00

742 lines
29 KiB
Python

"""
Bybit Interface
"""
import logging
import time
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, timezone
import json
import os
try:
from pybit.unified_trading import HTTP
except ImportError:
HTTP = None
logging.warning("pybit not installed. Run: pip install pybit")
from .exchange_interface import ExchangeInterface
from .bybit_rest_client import BybitRestClient
logger = logging.getLogger(__name__)
class BybitInterface(ExchangeInterface):
"""Bybit Exchange API Interface for cryptocurrency derivatives trading.
Supports both testnet and live trading environments.
Focus on USDT perpetuals and spot trading.
"""
def __init__(self, api_key: str = "", api_secret: str = "", test_mode: bool = True):
"""Initialize Bybit exchange interface.
Args:
api_key: Bybit API key
api_secret: Bybit API secret
test_mode: If True, use testnet environment
"""
super().__init__(api_key, api_secret, test_mode)
# Bybit-specific settings
self.session = None
self.rest_client = None # Raw REST client fallback
self.category = "linear" # Default to USDT perpetuals
self.supported_symbols = set()
self.use_fallback = False # Track if we should use REST client
# Caching to reduce API calls and avoid rate limiting
self._open_orders_cache = {}
self._open_orders_cache_time = 0
self._cache_timeout = 5 # 5 seconds cache timeout
# Load credentials from environment if not provided
if not api_key:
self.api_key = os.getenv('BYBIT_API_KEY', '')
if not api_secret:
self.api_secret = os.getenv('BYBIT_API_SECRET', '')
logger.info(f"Initialized BybitInterface (testnet: {test_mode})")
def connect(self) -> bool:
"""Connect to Bybit API.
Returns:
bool: True if connection successful, False otherwise
"""
try:
if HTTP is None:
logger.error("pybit library not installed")
return False
if not self.api_key or not self.api_secret:
logger.error("API key and secret required for Bybit connection")
return False
# Initialize pybit session
self.session = HTTP(
testnet=self.test_mode,
api_key=self.api_key,
api_secret=self.api_secret,
)
# Initialize raw REST client as fallback
self.rest_client = BybitRestClient(
api_key=self.api_key,
api_secret=self.api_secret,
testnet=self.test_mode
)
# Test pybit connection first
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
logger.info(f"Successfully connected to Bybit via pybit (testnet: {self.test_mode})")
self.use_fallback = False
self._load_instruments()
return True
else:
logger.warning(f"pybit connection failed: {account_info}")
raise Exception("pybit connection failed")
except Exception as e:
logger.warning(f"pybit failed, trying REST client fallback: {e}")
# Test REST client fallback
if self.rest_client.test_connectivity() and self.rest_client.test_authentication():
logger.info(f"Successfully connected to Bybit via REST client fallback (testnet: {self.test_mode})")
self.use_fallback = True
self._load_instruments()
return True
else:
logger.error("Both pybit and REST client failed")
return False
except Exception as e:
logger.error(f"Error connecting to Bybit: {e}")
return False
def _load_instruments(self) -> None:
"""Load available trading instruments."""
try:
instruments_response = self.session.get_instruments_info(category=self.category)
if instruments_response.get('retCode') == 0:
instruments = instruments_response.get('result', {}).get('list', [])
self.supported_symbols = {instr['symbol'] for instr in instruments}
logger.info(f"Loaded {len(self.supported_symbols)} instruments")
else:
logger.warning(f"Failed to load instruments: {instruments_response}")
except Exception as e:
logger.warning(f"Error loading instruments: {e}")
def get_instruments(self, category: str = "linear") -> List[Dict[str, Any]]:
"""Get available trading instruments.
Args:
category: Instrument category (linear, spot, inverse, option)
Returns:
List of instrument dictionaries
"""
try:
response = self.session.get_instruments_info(category=category)
if response.get('retCode') == 0:
return response.get('result', {}).get('list', [])
else:
logger.error(f"Failed to get instruments: {response}")
return []
except Exception as e:
logger.error(f"Error getting instruments: {e}")
return []
def get_balance(self, asset: str) -> float:
"""Get balance of a specific asset.
Args:
asset: Asset symbol (e.g., 'BTC', 'USDT')
Returns:
float: Available balance of the asset
"""
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
balances = account_info.get('result', {}).get('list', [])
for account in balances:
coins = account.get('coin', [])
for coin in coins:
if coin.get('coin', '').upper() == asset.upper():
available_balance = float(coin.get('availableToWithdraw', 0))
logger.debug(f"Balance for {asset}: {available_balance}")
return available_balance
logger.debug(f"No balance found for asset {asset}")
return 0.0
else:
logger.error(f"Failed to get account balance: {account_info}")
return 0.0
except Exception as e:
logger.error(f"Error getting balance for {asset}: {e}")
return 0.0
def get_account_summary(self) -> Dict[str, Any]:
"""Get account summary with all balances and positions.
Returns:
Dictionary with account information
"""
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
return account_info
else:
logger.error(f"Failed to get account summary: {account_info}")
return {}
except Exception as e:
logger.error(f"Error getting account summary: {e}")
return {}
def get_all_balances(self) -> Dict[str, Dict[str, float]]:
"""Get all account balances in the format expected by trading executor.
Returns:
Dictionary with asset balances in format: {asset: {'free': float, 'locked': float}}
"""
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
balances = {}
accounts = account_info.get('result', {}).get('list', [])
for account in accounts:
coins = account.get('coin', [])
for coin in coins:
asset = coin.get('coin', '')
if asset:
# Convert Bybit balance format to MEXC-compatible format
available = float(coin.get('availableToWithdraw', 0))
locked = float(coin.get('locked', 0))
balances[asset] = {
'free': available,
'locked': locked,
'total': available + locked
}
logger.debug(f"Retrieved balances for {len(balances)} assets")
return balances
else:
logger.error(f"Failed to get all balances: {account_info}")
return {}
except Exception as e:
logger.error(f"Error getting all balances: {e}")
return {}
def get_ticker(self, symbol: str) -> Dict[str, Any]:
"""Get ticker information for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
Returns:
Dictionary with ticker information
"""
try:
formatted_symbol = self._format_symbol(symbol)
ticker_response = self.session.get_tickers(
category=self.category,
symbol=formatted_symbol
)
if ticker_response.get('retCode') == 0:
ticker_data = ticker_response.get('result', {}).get('list', [])
if ticker_data:
ticker = ticker_data[0]
# Cache the last price
last_price = float(ticker.get('lastPrice', 0))
self.last_price_cache[symbol] = last_price
return {
'symbol': symbol,
'last_price': last_price,
'bid_price': float(ticker.get('bid1Price', 0)),
'ask_price': float(ticker.get('ask1Price', 0)),
'volume_24h': float(ticker.get('volume24h', 0)),
'change_24h': float(ticker.get('price24hPcnt', 0)),
'high_24h': float(ticker.get('highPrice24h', 0)),
'low_24h': float(ticker.get('lowPrice24h', 0)),
'timestamp': int(ticker.get('time', 0))
}
else:
logger.error(f"No ticker data for {symbol}")
return {}
else:
logger.error(f"Failed to get ticker for {symbol}: {ticker_response}")
return {}
except Exception as e:
logger.error(f"Error getting ticker for {symbol}: {e}")
return {}
def place_order(self, symbol: str, side: str, order_type: str,
quantity: float, price: float = None) -> Dict[str, Any]:
"""Place an order.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
side: 'buy' or 'sell'
order_type: 'market' or 'limit'
quantity: Order quantity
price: Order price (required for limit orders)
Returns:
Dictionary with order information
"""
try:
formatted_symbol = self._format_symbol(symbol)
bybit_side = side.capitalize() # 'Buy' or 'Sell'
bybit_order_type = self._map_order_type(order_type)
order_params = {
'category': self.category,
'symbol': formatted_symbol,
'side': bybit_side,
'orderType': bybit_order_type,
'qty': str(quantity),
}
if order_type.lower() == 'limit' and price is not None:
order_params['price'] = str(price)
order_params['timeInForce'] = 'GTC' # Good Till Cancelled
response = self.session.place_order(**order_params)
if response.get('retCode') == 0:
result = response.get('result', {})
order_info = {
'order_id': result.get('orderId'),
'symbol': symbol,
'side': side,
'type': order_type,
'quantity': quantity,
'price': price,
'status': 'submitted',
'timestamp': int(time.time() * 1000)
}
logger.info(f"Successfully placed {order_type} {side} order for {quantity} {symbol}")
return order_info
else:
error_msg = response.get('retMsg', 'Unknown error')
logger.error(f"Failed to place order: {error_msg}")
return {'error': error_msg}
except Exception as e:
logger.error(f"Error placing order: {e}")
return {'error': str(e)}
def _process_pybit_orders(self, orders_list: List[Dict]) -> List[Dict[str, Any]]:
"""Process orders from pybit response format."""
open_orders = []
for order in orders_list:
order_info = {
'order_id': order.get('orderId'),
'symbol': order.get('symbol'),
'side': order.get('side', '').lower(),
'type': order.get('orderType', '').lower(),
'quantity': float(order.get('qty', 0)),
'filled_quantity': float(order.get('cumExecQty', 0)),
'price': float(order.get('price', 0)),
'status': self._map_order_status(order.get('orderStatus', '')),
'timestamp': int(order.get('createdTime', 0))
}
open_orders.append(order_info)
return open_orders
def _process_rest_orders(self, orders_list: List[Dict]) -> List[Dict[str, Any]]:
"""Process orders from REST client response format."""
# REST client returns same format as pybit, so we can reuse the method
return self._process_pybit_orders(orders_list)
def cancel_order(self, symbol: str, order_id: str) -> bool:
"""Cancel an order.
Args:
symbol: Trading symbol
order_id: Order ID to cancel
Returns:
bool: True if order was cancelled successfully
"""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.cancel_order(
category=self.category,
symbol=formatted_symbol,
orderId=order_id
)
if response.get('retCode') == 0:
logger.info(f"Successfully cancelled order {order_id}")
return True
else:
error_msg = response.get('retMsg', 'Unknown error')
logger.error(f"Failed to cancel order {order_id}: {error_msg}")
return False
except Exception as e:
logger.error(f"Error cancelling order {order_id}: {e}")
return False
def get_order_status(self, symbol: str, order_id: str) -> Dict[str, Any]:
"""Get status of an order.
Args:
symbol: Trading symbol
order_id: Order ID
Returns:
Dictionary with order status information
"""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.get_open_orders(
category=self.category,
symbol=formatted_symbol,
orderId=order_id
)
if response.get('retCode') == 0:
orders = response.get('result', {}).get('list', [])
if orders:
order = orders[0]
return {
'order_id': order.get('orderId'),
'symbol': symbol,
'side': order.get('side', '').lower(),
'type': order.get('orderType', '').lower(),
'quantity': float(order.get('qty', 0)),
'filled_quantity': float(order.get('cumExecQty', 0)),
'price': float(order.get('price', 0)),
'average_price': float(order.get('avgPrice', 0)),
'status': self._map_order_status(order.get('orderStatus', '')),
'timestamp': int(order.get('createdTime', 0))
}
else:
# Order might be filled/cancelled, check order history
return self._get_order_from_history(symbol, order_id)
else:
logger.error(f"Failed to get order status: {response}")
return {}
except Exception as e:
logger.error(f"Error getting order status for {order_id}: {e}")
return {}
def _get_order_from_history(self, symbol: str, order_id: str) -> Dict[str, Any]:
"""Get order from order history (for filled/cancelled orders)."""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.get_order_history(
category=self.category,
symbol=formatted_symbol,
orderId=order_id
)
if response.get('retCode') == 0:
orders = response.get('result', {}).get('list', [])
if orders:
order = orders[0]
return {
'order_id': order.get('orderId'),
'symbol': symbol,
'side': order.get('side', '').lower(),
'type': order.get('orderType', '').lower(),
'quantity': float(order.get('qty', 0)),
'filled_quantity': float(order.get('cumExecQty', 0)),
'price': float(order.get('price', 0)),
'average_price': float(order.get('avgPrice', 0)),
'status': self._map_order_status(order.get('orderStatus', '')),
'timestamp': int(order.get('createdTime', 0))
}
return {}
except Exception as e:
logger.error(f"Error getting order from history: {e}")
return {}
def get_open_orders(self, symbol: str = None) -> List[Dict[str, Any]]:
"""Get open orders with caching and fallback to REST client.
Args:
symbol: Trading symbol (optional, gets all if None)
Returns:
List of open order dictionaries
"""
try:
import time
current_time = time.time()
cache_key = symbol or 'all'
# Check if we have fresh cached data
if (cache_key in self._open_orders_cache and
current_time - self._open_orders_cache_time < self._cache_timeout):
logger.debug(f"Returning cached open orders for {cache_key}")
return self._open_orders_cache[cache_key]
# Try pybit first if not using fallback
if not self.use_fallback and self.session:
try:
params = {
'category': self.category,
'openOnly': True
}
if symbol:
params['symbol'] = self._format_symbol(symbol)
response = self.session.get_open_orders(**params)
# Process pybit response
if response.get('retCode') == 0:
orders = self._process_pybit_orders(response.get('result', {}).get('list', []))
# Cache the result
self._open_orders_cache[cache_key] = orders
self._open_orders_cache_time = current_time
logger.debug(f"Found {len(orders)} open orders via pybit, cached for {self._cache_timeout}s")
return orders
else:
logger.warning(f"pybit get_open_orders failed: {response}")
raise Exception("pybit failed")
except Exception as e:
error_str = str(e)
if "10016" in error_str or "System error" in error_str:
logger.warning(f"pybit rate limited (Error 10016), switching to REST fallback: {e}")
self.use_fallback = True
else:
logger.warning(f"pybit get_open_orders error, trying REST fallback: {e}")
# Use REST client (either as primary or fallback)
if self.rest_client:
formatted_symbol = self._format_symbol(symbol) if symbol else None
response = self.rest_client.get_open_orders(self.category, formatted_symbol)
orders = self._process_rest_orders(response.get('result', {}).get('list', []))
# Cache the result
self._open_orders_cache[cache_key] = orders
self._open_orders_cache_time = current_time
logger.debug(f"Found {len(orders)} open orders via REST client, cached for {self._cache_timeout}s")
return orders
else:
logger.error("No available API client (pybit or REST)")
return []
except Exception as e:
logger.error(f"Error getting open orders: {e}")
return []
def get_positions(self, symbol: str = None) -> List[Dict[str, Any]]:
"""Get position information.
Args:
symbol: Trading symbol (optional, gets all if None)
Returns:
List of position dictionaries
"""
try:
params = {'category': self.category}
if symbol:
params['symbol'] = self._format_symbol(symbol)
response = self.session.get_positions(**params)
if response.get('retCode') == 0:
positions = response.get('result', {}).get('list', [])
position_list = []
for pos in positions:
# Only include positions with non-zero size
size = float(pos.get('size', 0))
if size != 0:
position_info = {
'symbol': pos.get('symbol'),
'side': pos.get('side', '').lower(),
'size': size,
'entry_price': float(pos.get('avgPrice', 0)),
'mark_price': float(pos.get('markPrice', 0)),
'unrealized_pnl': float(pos.get('unrealisedPnl', 0)),
'percentage': float(pos.get('unrealisedPnlPct', 0)),
'leverage': float(pos.get('leverage', 0)),
'timestamp': int(pos.get('updatedTime', 0))
}
position_list.append(position_info)
logger.debug(f"Found {len(position_list)} positions")
return position_list
else:
logger.error(f"Failed to get positions: {response}")
return []
except Exception as e:
logger.error(f"Error getting positions: {e}")
return []
def _format_symbol(self, symbol: str) -> str:
"""Format symbol for Bybit API.
Args:
symbol: Symbol in various formats
Returns:
Formatted symbol for Bybit
"""
# Remove any separators and convert to uppercase
clean_symbol = symbol.replace('/', '').replace('-', '').replace('_', '').upper()
# Common mappings
symbol_mapping = {
'BTCUSDT': 'BTCUSDT',
'ETHUSDT': 'ETHUSDT',
'BTCUSD': 'BTCUSDT', # Map to USDT perpetual
'ETHUSD': 'ETHUSDT', # Map to USDT perpetual
}
return symbol_mapping.get(clean_symbol, clean_symbol)
def _map_order_type(self, order_type: str) -> str:
"""Map order type to Bybit format.
Args:
order_type: Order type ('market', 'limit')
Returns:
Bybit order type
"""
type_mapping = {
'market': 'Market',
'limit': 'Limit',
'stop': 'Stop',
'stop_limit': 'StopLimit'
}
return type_mapping.get(order_type.lower(), 'Market')
def _map_order_status(self, status: str) -> str:
"""Map Bybit order status to standard format.
Args:
status: Bybit order status
Returns:
Standardized order status
"""
status_mapping = {
'New': 'open',
'PartiallyFilled': 'partially_filled',
'Filled': 'filled',
'Cancelled': 'cancelled',
'Rejected': 'rejected',
'PartiallyFilledCanceled': 'cancelled'
}
return status_mapping.get(status, status.lower())
def get_orderbook(self, symbol: str, depth: int = 25) -> Dict[str, Any]:
"""Get orderbook for a symbol.
Args:
symbol: Trading symbol
depth: Number of price levels to return (max 200)
Returns:
Dictionary with orderbook data
"""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.get_orderbook(
category=self.category,
symbol=formatted_symbol,
limit=min(depth, 200) # Bybit max limit is 200
)
if response.get('retCode') == 0:
orderbook_data = response.get('result', {})
bids = []
asks = []
# Process bids (buy orders)
for bid in orderbook_data.get('b', []):
bids.append([float(bid[0]), float(bid[1])])
# Process asks (sell orders)
for ask in orderbook_data.get('a', []):
asks.append([float(ask[0]), float(ask[1])])
return {
'symbol': symbol,
'bids': bids,
'asks': asks,
'timestamp': int(orderbook_data.get('ts', 0))
}
else:
logger.error(f"Failed to get orderbook for {symbol}: {response}")
return {}
except Exception as e:
logger.error(f"Error getting orderbook for {symbol}: {e}")
return {}
def close_position(self, symbol: str, quantity: float = None) -> Dict[str, Any]:
"""Close a position (market order in opposite direction).
Args:
symbol: Trading symbol
quantity: Quantity to close (None for full position)
Returns:
Dictionary with order information
"""
try:
# Get current position
positions = self.get_positions(symbol)
if not positions:
logger.warning(f"No position found for {symbol}")
return {'error': 'No position found'}
position = positions[0]
position_size = position['size']
position_side = position['side']
# Determine close quantity
close_quantity = quantity if quantity is not None else abs(position_size)
# Determine opposite side
close_side = 'sell' if position_side == 'buy' else 'buy'
# Place market order to close position
return self.place_order(
symbol=symbol,
side=close_side,
order_type='market',
quantity=close_quantity
)
except Exception as e:
logger.error(f"Error closing position for {symbol}: {e}")
return {'error': str(e)}