621 lines
23 KiB
Python
621 lines
23 KiB
Python
import logging
|
|
import time
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
from datetime import datetime, timezone
|
|
import json
|
|
import os
|
|
|
|
try:
|
|
from pybit.unified_trading import HTTP
|
|
except ImportError:
|
|
HTTP = None
|
|
logging.warning("pybit not installed. Run: pip install pybit")
|
|
|
|
from .exchange_interface import ExchangeInterface
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BybitInterface(ExchangeInterface):
|
|
"""Bybit Exchange API Interface for cryptocurrency derivatives trading.
|
|
|
|
Supports both testnet and live trading environments.
|
|
Focus on USDT perpetuals and spot trading.
|
|
"""
|
|
|
|
def __init__(self, api_key: str = "", api_secret: str = "", test_mode: bool = True):
|
|
"""Initialize Bybit exchange interface.
|
|
|
|
Args:
|
|
api_key: Bybit API key
|
|
api_secret: Bybit API secret
|
|
test_mode: If True, use testnet environment
|
|
"""
|
|
super().__init__(api_key, api_secret, test_mode)
|
|
|
|
# Bybit-specific settings
|
|
self.session = None
|
|
self.category = "linear" # Default to USDT perpetuals
|
|
self.supported_symbols = set()
|
|
|
|
# Load credentials from environment if not provided
|
|
if not api_key:
|
|
self.api_key = os.getenv('BYBIT_API_KEY', '')
|
|
if not api_secret:
|
|
self.api_secret = os.getenv('BYBIT_API_SECRET', '')
|
|
|
|
logger.info(f"Initialized BybitInterface (testnet: {test_mode})")
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to Bybit API.
|
|
|
|
Returns:
|
|
bool: True if connection successful, False otherwise
|
|
"""
|
|
try:
|
|
if HTTP is None:
|
|
logger.error("pybit library not installed")
|
|
return False
|
|
|
|
if not self.api_key or not self.api_secret:
|
|
logger.error("API key and secret required for Bybit connection")
|
|
return False
|
|
|
|
# Create HTTP session
|
|
self.session = HTTP(
|
|
testnet=self.test_mode,
|
|
api_key=self.api_key,
|
|
api_secret=self.api_secret,
|
|
)
|
|
|
|
# Test connection by getting account info
|
|
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
|
|
if account_info.get('retCode') == 0:
|
|
logger.info(f"Successfully connected to Bybit (testnet: {self.test_mode})")
|
|
self._load_instruments()
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to connect to Bybit: {account_info}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error connecting to Bybit: {e}")
|
|
return False
|
|
|
|
def _load_instruments(self) -> None:
|
|
"""Load available trading instruments."""
|
|
try:
|
|
instruments_response = self.session.get_instruments_info(category=self.category)
|
|
if instruments_response.get('retCode') == 0:
|
|
instruments = instruments_response.get('result', {}).get('list', [])
|
|
self.supported_symbols = {instr['symbol'] for instr in instruments}
|
|
logger.info(f"Loaded {len(self.supported_symbols)} instruments")
|
|
else:
|
|
logger.warning(f"Failed to load instruments: {instruments_response}")
|
|
except Exception as e:
|
|
logger.warning(f"Error loading instruments: {e}")
|
|
|
|
def get_instruments(self, category: str = "linear") -> List[Dict[str, Any]]:
|
|
"""Get available trading instruments.
|
|
|
|
Args:
|
|
category: Instrument category (linear, spot, inverse, option)
|
|
|
|
Returns:
|
|
List of instrument dictionaries
|
|
"""
|
|
try:
|
|
response = self.session.get_instruments_info(category=category)
|
|
if response.get('retCode') == 0:
|
|
return response.get('result', {}).get('list', [])
|
|
else:
|
|
logger.error(f"Failed to get instruments: {response}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error getting instruments: {e}")
|
|
return []
|
|
|
|
def get_balance(self, asset: str) -> float:
|
|
"""Get balance of a specific asset.
|
|
|
|
Args:
|
|
asset: Asset symbol (e.g., 'BTC', 'USDT')
|
|
|
|
Returns:
|
|
float: Available balance of the asset
|
|
"""
|
|
try:
|
|
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
|
|
if account_info.get('retCode') == 0:
|
|
balances = account_info.get('result', {}).get('list', [])
|
|
|
|
for account in balances:
|
|
coins = account.get('coin', [])
|
|
for coin in coins:
|
|
if coin.get('coin', '').upper() == asset.upper():
|
|
available_balance = float(coin.get('availableToWithdraw', 0))
|
|
logger.debug(f"Balance for {asset}: {available_balance}")
|
|
return available_balance
|
|
|
|
logger.debug(f"No balance found for asset {asset}")
|
|
return 0.0
|
|
else:
|
|
logger.error(f"Failed to get account balance: {account_info}")
|
|
return 0.0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting balance for {asset}: {e}")
|
|
return 0.0
|
|
|
|
def get_account_summary(self) -> Dict[str, Any]:
|
|
"""Get account summary with all balances and positions.
|
|
|
|
Returns:
|
|
Dictionary with account information
|
|
"""
|
|
try:
|
|
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
|
|
if account_info.get('retCode') == 0:
|
|
return account_info
|
|
else:
|
|
logger.error(f"Failed to get account summary: {account_info}")
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error getting account summary: {e}")
|
|
return {}
|
|
|
|
def get_ticker(self, symbol: str) -> Dict[str, Any]:
|
|
"""Get ticker information for a symbol.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTCUSDT')
|
|
|
|
Returns:
|
|
Dictionary with ticker information
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
ticker_response = self.session.get_tickers(
|
|
category=self.category,
|
|
symbol=formatted_symbol
|
|
)
|
|
|
|
if ticker_response.get('retCode') == 0:
|
|
ticker_data = ticker_response.get('result', {}).get('list', [])
|
|
if ticker_data:
|
|
ticker = ticker_data[0]
|
|
|
|
# Cache the last price
|
|
last_price = float(ticker.get('lastPrice', 0))
|
|
self.last_price_cache[symbol] = last_price
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'last_price': last_price,
|
|
'bid_price': float(ticker.get('bid1Price', 0)),
|
|
'ask_price': float(ticker.get('ask1Price', 0)),
|
|
'volume_24h': float(ticker.get('volume24h', 0)),
|
|
'change_24h': float(ticker.get('price24hPcnt', 0)),
|
|
'high_24h': float(ticker.get('highPrice24h', 0)),
|
|
'low_24h': float(ticker.get('lowPrice24h', 0)),
|
|
'timestamp': int(ticker.get('time', 0))
|
|
}
|
|
else:
|
|
logger.error(f"No ticker data for {symbol}")
|
|
return {}
|
|
else:
|
|
logger.error(f"Failed to get ticker for {symbol}: {ticker_response}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting ticker for {symbol}: {e}")
|
|
return {}
|
|
|
|
def place_order(self, symbol: str, side: str, order_type: str,
|
|
quantity: float, price: float = None) -> Dict[str, Any]:
|
|
"""Place an order.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTCUSDT')
|
|
side: 'buy' or 'sell'
|
|
order_type: 'market' or 'limit'
|
|
quantity: Order quantity
|
|
price: Order price (required for limit orders)
|
|
|
|
Returns:
|
|
Dictionary with order information
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
bybit_side = side.capitalize() # 'Buy' or 'Sell'
|
|
bybit_order_type = self._map_order_type(order_type)
|
|
|
|
order_params = {
|
|
'category': self.category,
|
|
'symbol': formatted_symbol,
|
|
'side': bybit_side,
|
|
'orderType': bybit_order_type,
|
|
'qty': str(quantity),
|
|
}
|
|
|
|
if order_type.lower() == 'limit' and price is not None:
|
|
order_params['price'] = str(price)
|
|
order_params['timeInForce'] = 'GTC' # Good Till Cancelled
|
|
|
|
response = self.session.place_order(**order_params)
|
|
|
|
if response.get('retCode') == 0:
|
|
result = response.get('result', {})
|
|
order_info = {
|
|
'order_id': result.get('orderId'),
|
|
'symbol': symbol,
|
|
'side': side,
|
|
'type': order_type,
|
|
'quantity': quantity,
|
|
'price': price,
|
|
'status': 'submitted',
|
|
'timestamp': int(time.time() * 1000)
|
|
}
|
|
|
|
logger.info(f"Successfully placed {order_type} {side} order for {quantity} {symbol}")
|
|
return order_info
|
|
else:
|
|
error_msg = response.get('retMsg', 'Unknown error')
|
|
logger.error(f"Failed to place order: {error_msg}")
|
|
return {'error': error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error placing order: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def cancel_order(self, symbol: str, order_id: str) -> bool:
|
|
"""Cancel an order.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
order_id: Order ID to cancel
|
|
|
|
Returns:
|
|
bool: True if order was cancelled successfully
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.cancel_order(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
orderId=order_id
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
logger.info(f"Successfully cancelled order {order_id}")
|
|
return True
|
|
else:
|
|
error_msg = response.get('retMsg', 'Unknown error')
|
|
logger.error(f"Failed to cancel order {order_id}: {error_msg}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling order {order_id}: {e}")
|
|
return False
|
|
|
|
def get_order_status(self, symbol: str, order_id: str) -> Dict[str, Any]:
|
|
"""Get status of an order.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
order_id: Order ID
|
|
|
|
Returns:
|
|
Dictionary with order status information
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_open_orders(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
orderId=order_id
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
orders = response.get('result', {}).get('list', [])
|
|
if orders:
|
|
order = orders[0]
|
|
return {
|
|
'order_id': order.get('orderId'),
|
|
'symbol': symbol,
|
|
'side': order.get('side', '').lower(),
|
|
'type': order.get('orderType', '').lower(),
|
|
'quantity': float(order.get('qty', 0)),
|
|
'filled_quantity': float(order.get('cumExecQty', 0)),
|
|
'price': float(order.get('price', 0)),
|
|
'average_price': float(order.get('avgPrice', 0)),
|
|
'status': self._map_order_status(order.get('orderStatus', '')),
|
|
'timestamp': int(order.get('createdTime', 0))
|
|
}
|
|
else:
|
|
# Order might be filled/cancelled, check order history
|
|
return self._get_order_from_history(symbol, order_id)
|
|
else:
|
|
logger.error(f"Failed to get order status: {response}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting order status for {order_id}: {e}")
|
|
return {}
|
|
|
|
def _get_order_from_history(self, symbol: str, order_id: str) -> Dict[str, Any]:
|
|
"""Get order from order history (for filled/cancelled orders)."""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_order_history(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
orderId=order_id
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
orders = response.get('result', {}).get('list', [])
|
|
if orders:
|
|
order = orders[0]
|
|
return {
|
|
'order_id': order.get('orderId'),
|
|
'symbol': symbol,
|
|
'side': order.get('side', '').lower(),
|
|
'type': order.get('orderType', '').lower(),
|
|
'quantity': float(order.get('qty', 0)),
|
|
'filled_quantity': float(order.get('cumExecQty', 0)),
|
|
'price': float(order.get('price', 0)),
|
|
'average_price': float(order.get('avgPrice', 0)),
|
|
'status': self._map_order_status(order.get('orderStatus', '')),
|
|
'timestamp': int(order.get('createdTime', 0))
|
|
}
|
|
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error getting order from history: {e}")
|
|
return {}
|
|
|
|
def get_open_orders(self, symbol: str = None) -> List[Dict[str, Any]]:
|
|
"""Get open orders.
|
|
|
|
Args:
|
|
symbol: Trading symbol (optional, gets all if None)
|
|
|
|
Returns:
|
|
List of open order dictionaries
|
|
"""
|
|
try:
|
|
params = {
|
|
'category': self.category,
|
|
'openOnly': True
|
|
}
|
|
|
|
if symbol:
|
|
params['symbol'] = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_open_orders(**params)
|
|
|
|
if response.get('retCode') == 0:
|
|
orders = response.get('result', {}).get('list', [])
|
|
|
|
open_orders = []
|
|
for order in orders:
|
|
order_info = {
|
|
'order_id': order.get('orderId'),
|
|
'symbol': order.get('symbol'),
|
|
'side': order.get('side', '').lower(),
|
|
'type': order.get('orderType', '').lower(),
|
|
'quantity': float(order.get('qty', 0)),
|
|
'filled_quantity': float(order.get('cumExecQty', 0)),
|
|
'price': float(order.get('price', 0)),
|
|
'status': self._map_order_status(order.get('orderStatus', '')),
|
|
'timestamp': int(order.get('createdTime', 0))
|
|
}
|
|
open_orders.append(order_info)
|
|
|
|
logger.debug(f"Found {len(open_orders)} open orders")
|
|
return open_orders
|
|
else:
|
|
logger.error(f"Failed to get open orders: {response}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting open orders: {e}")
|
|
return []
|
|
|
|
def get_positions(self, symbol: str = None) -> List[Dict[str, Any]]:
|
|
"""Get position information.
|
|
|
|
Args:
|
|
symbol: Trading symbol (optional, gets all if None)
|
|
|
|
Returns:
|
|
List of position dictionaries
|
|
"""
|
|
try:
|
|
params = {'category': self.category}
|
|
if symbol:
|
|
params['symbol'] = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_positions(**params)
|
|
|
|
if response.get('retCode') == 0:
|
|
positions = response.get('result', {}).get('list', [])
|
|
|
|
position_list = []
|
|
for pos in positions:
|
|
# Only include positions with non-zero size
|
|
size = float(pos.get('size', 0))
|
|
if size != 0:
|
|
position_info = {
|
|
'symbol': pos.get('symbol'),
|
|
'side': pos.get('side', '').lower(),
|
|
'size': size,
|
|
'entry_price': float(pos.get('avgPrice', 0)),
|
|
'mark_price': float(pos.get('markPrice', 0)),
|
|
'unrealized_pnl': float(pos.get('unrealisedPnl', 0)),
|
|
'percentage': float(pos.get('unrealisedPnlPct', 0)),
|
|
'leverage': float(pos.get('leverage', 0)),
|
|
'timestamp': int(pos.get('updatedTime', 0))
|
|
}
|
|
position_list.append(position_info)
|
|
|
|
logger.debug(f"Found {len(position_list)} positions")
|
|
return position_list
|
|
else:
|
|
logger.error(f"Failed to get positions: {response}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting positions: {e}")
|
|
return []
|
|
|
|
def _format_symbol(self, symbol: str) -> str:
|
|
"""Format symbol for Bybit API.
|
|
|
|
Args:
|
|
symbol: Symbol in various formats
|
|
|
|
Returns:
|
|
Formatted symbol for Bybit
|
|
"""
|
|
# Remove any separators and convert to uppercase
|
|
clean_symbol = symbol.replace('/', '').replace('-', '').replace('_', '').upper()
|
|
|
|
# Common mappings
|
|
symbol_mapping = {
|
|
'BTCUSDT': 'BTCUSDT',
|
|
'ETHUSDT': 'ETHUSDT',
|
|
'BTCUSD': 'BTCUSDT', # Map to USDT perpetual
|
|
'ETHUSD': 'ETHUSDT', # Map to USDT perpetual
|
|
}
|
|
|
|
return symbol_mapping.get(clean_symbol, clean_symbol)
|
|
|
|
def _map_order_type(self, order_type: str) -> str:
|
|
"""Map order type to Bybit format.
|
|
|
|
Args:
|
|
order_type: Order type ('market', 'limit')
|
|
|
|
Returns:
|
|
Bybit order type
|
|
"""
|
|
type_mapping = {
|
|
'market': 'Market',
|
|
'limit': 'Limit',
|
|
'stop': 'Stop',
|
|
'stop_limit': 'StopLimit'
|
|
}
|
|
|
|
return type_mapping.get(order_type.lower(), 'Market')
|
|
|
|
def _map_order_status(self, status: str) -> str:
|
|
"""Map Bybit order status to standard format.
|
|
|
|
Args:
|
|
status: Bybit order status
|
|
|
|
Returns:
|
|
Standardized order status
|
|
"""
|
|
status_mapping = {
|
|
'New': 'open',
|
|
'PartiallyFilled': 'partially_filled',
|
|
'Filled': 'filled',
|
|
'Cancelled': 'cancelled',
|
|
'Rejected': 'rejected',
|
|
'PartiallyFilledCanceled': 'cancelled'
|
|
}
|
|
|
|
return status_mapping.get(status, status.lower())
|
|
|
|
def get_orderbook(self, symbol: str, depth: int = 25) -> Dict[str, Any]:
|
|
"""Get orderbook for a symbol.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
depth: Number of price levels to return (max 200)
|
|
|
|
Returns:
|
|
Dictionary with orderbook data
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_orderbook(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
limit=min(depth, 200) # Bybit max limit is 200
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
orderbook_data = response.get('result', {})
|
|
|
|
bids = []
|
|
asks = []
|
|
|
|
# Process bids (buy orders)
|
|
for bid in orderbook_data.get('b', []):
|
|
bids.append([float(bid[0]), float(bid[1])])
|
|
|
|
# Process asks (sell orders)
|
|
for ask in orderbook_data.get('a', []):
|
|
asks.append([float(ask[0]), float(ask[1])])
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'bids': bids,
|
|
'asks': asks,
|
|
'timestamp': int(orderbook_data.get('ts', 0))
|
|
}
|
|
else:
|
|
logger.error(f"Failed to get orderbook for {symbol}: {response}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting orderbook for {symbol}: {e}")
|
|
return {}
|
|
|
|
def close_position(self, symbol: str, quantity: float = None) -> Dict[str, Any]:
|
|
"""Close a position (market order in opposite direction).
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
quantity: Quantity to close (None for full position)
|
|
|
|
Returns:
|
|
Dictionary with order information
|
|
"""
|
|
try:
|
|
# Get current position
|
|
positions = self.get_positions(symbol)
|
|
if not positions:
|
|
logger.warning(f"No position found for {symbol}")
|
|
return {'error': 'No position found'}
|
|
|
|
position = positions[0]
|
|
position_size = position['size']
|
|
position_side = position['side']
|
|
|
|
# Determine close quantity
|
|
close_quantity = quantity if quantity is not None else abs(position_size)
|
|
|
|
# Determine opposite side
|
|
close_side = 'sell' if position_side == 'buy' else 'buy'
|
|
|
|
# Place market order to close position
|
|
return self.place_order(
|
|
symbol=symbol,
|
|
side=close_side,
|
|
order_type='market',
|
|
quantity=close_quantity
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error closing position for {symbol}: {e}")
|
|
return {'error': str(e)} |