Files
gogo2/NN/exchanges/bybit_interface.py
Dobromir Popov 24230f7f79 leverae tweak
2025-07-15 00:51:42 +03:00

1021 lines
41 KiB
Python

"""
Bybit Interface
"""
import logging
import time
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, timezone
import json
import os
try:
from pybit.unified_trading import HTTP
except ImportError:
HTTP = None
logging.warning("pybit not installed. Run: pip install pybit")
from .exchange_interface import ExchangeInterface
from .bybit_rest_client import BybitRestClient
logger = logging.getLogger(__name__)
class BybitInterface(ExchangeInterface):
"""Bybit Exchange API Interface for cryptocurrency derivatives trading.
Supports both testnet and live trading environments.
Focus on USDT perpetuals and spot trading.
"""
def __init__(self, api_key: str = "", api_secret: str = "", test_mode: bool = True):
"""Initialize Bybit exchange interface.
Args:
api_key: Bybit API key
api_secret: Bybit API secret
test_mode: If True, use testnet environment
"""
super().__init__(api_key, api_secret, test_mode)
# Bybit-specific settings
self.session = None
self.rest_client = None # Raw REST client fallback
self.category = "linear" # Default to USDT perpetuals
self.supported_symbols = set()
self.use_fallback = False # Track if we should use REST client
# Caching to reduce API calls and avoid rate limiting
self._open_orders_cache = {}
self._open_orders_cache_time = 0
self._cache_timeout = 5 # 5 seconds cache timeout
# Instrument info caching for minimum order size validation
self._instrument_cache = {}
self._instrument_cache_time = 0
self._instrument_cache_timeout = 300 # 5 minutes cache for instrument info
# Leverage settings
self.default_leverage = 10.0 # Default 10x leverage
self.leverage_cache = {} # Cache leverage settings per symbol
# Load credentials from environment if not provided
if not api_key:
self.api_key = os.getenv('BYBIT_API_KEY', '')
if not api_secret:
self.api_secret = os.getenv('BYBIT_API_SECRET', '')
logger.info(f"Initialized BybitInterface (testnet: {test_mode})")
def connect(self) -> bool:
"""Connect to Bybit API.
Returns:
bool: True if connection successful, False otherwise
"""
try:
if HTTP is None:
logger.error("pybit library not installed")
return False
if not self.api_key or not self.api_secret:
logger.error("API key and secret required for Bybit connection")
return False
# Initialize pybit session
self.session = HTTP(
testnet=self.test_mode,
api_key=self.api_key,
api_secret=self.api_secret,
)
# Initialize raw REST client as fallback
self.rest_client = BybitRestClient(
api_key=self.api_key,
api_secret=self.api_secret,
testnet=self.test_mode
)
# Test pybit connection first
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
logger.info(f"Successfully connected to Bybit via pybit (testnet: {self.test_mode})")
self.use_fallback = False
self._load_instruments()
return True
else:
logger.warning(f"pybit connection failed: {account_info}")
raise Exception("pybit connection failed")
except Exception as e:
logger.warning(f"pybit failed, trying REST client fallback: {e}")
# Test REST client fallback
if self.rest_client.test_connectivity() and self.rest_client.test_authentication():
logger.info(f"Successfully connected to Bybit via REST client fallback (testnet: {self.test_mode})")
self.use_fallback = True
self._load_instruments()
return True
else:
logger.error("Both pybit and REST client failed")
return False
except Exception as e:
logger.error(f"Error connecting to Bybit: {e}")
return False
def _load_instruments(self) -> None:
"""Load available trading instruments."""
try:
if not self.session:
logger.warning("No session available for loading instruments")
return
instruments_response = self.session.get_instruments_info(category=self.category)
if instruments_response and instruments_response.get('retCode') == 0:
instruments = instruments_response.get('result', {}).get('list', [])
self.supported_symbols = {instr['symbol'] for instr in instruments}
logger.info(f"Loaded {len(self.supported_symbols)} instruments")
else:
logger.warning(f"Failed to load instruments: {instruments_response}")
except Exception as e:
logger.warning(f"Error loading instruments: {e}")
def get_instruments(self, category: str = "linear") -> List[Dict[str, Any]]:
"""Get available trading instruments.
Args:
category: Instrument category (linear, spot, inverse, option)
Returns:
List of instrument dictionaries
"""
try:
if not self.session:
logger.error("No session available for getting instruments")
return []
response = self.session.get_instruments_info(category=category)
if response and response.get('retCode') == 0:
return response.get('result', {}).get('list', [])
else:
logger.error(f"Failed to get instruments: {response}")
return []
except Exception as e:
logger.error(f"Error getting instruments: {e}")
return []
def get_balance(self, asset: str) -> float:
"""Get balance of a specific asset.
Args:
asset: Asset symbol (e.g., 'BTC', 'USDT')
Returns:
float: Available balance of the asset
"""
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
balances = account_info.get('result', {}).get('list', [])
for account in balances:
coins = account.get('coin', [])
for coin in coins:
if coin.get('coin', '').upper() == asset.upper():
# Try availableToWithdraw first, then equity, then walletBalance
available_str = coin.get('availableToWithdraw', '')
if available_str:
available_balance = float(available_str)
else:
# Use equity if availableToWithdraw is empty
equity_str = coin.get('equity', '')
if equity_str:
available_balance = float(equity_str)
else:
# Fall back to walletBalance
wallet_str = coin.get('walletBalance', '0')
available_balance = float(wallet_str) if wallet_str else 0.0
logger.debug(f"Balance for {asset}: {available_balance}")
return available_balance
logger.debug(f"No balance found for asset {asset}")
return 0.0
else:
logger.error(f"Failed to get account balance: {account_info}")
return 0.0
except Exception as e:
logger.error(f"Error getting balance for {asset}: {e}")
return 0.0
def get_account_summary(self) -> Dict[str, Any]:
"""Get account summary with all balances and positions.
Returns:
Dictionary with account information
"""
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
return account_info
else:
logger.error(f"Failed to get account summary: {account_info}")
return {}
except Exception as e:
logger.error(f"Error getting account summary: {e}")
return {}
def get_account_info(self) -> Dict[str, Any]:
"""Get account information (alias for get_account_summary for compatibility).
Returns:
Dictionary with account information
"""
return self.get_account_summary()
def get_all_balances(self) -> Dict[str, Dict[str, float]]:
"""Get all account balances in the format expected by trading executor.
Returns:
Dictionary with asset balances in format: {asset: {'free': float, 'locked': float}}
"""
try:
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
if account_info.get('retCode') == 0:
balances = {}
accounts = account_info.get('result', {}).get('list', [])
for account in accounts:
coins = account.get('coin', [])
for coin in coins:
asset = coin.get('coin', '')
if asset:
# Convert Bybit balance format to MEXC-compatible format
# Handle empty string values that cause conversion errors
available_str = coin.get('availableToWithdraw', '')
locked_str = coin.get('locked', '')
equity_str = coin.get('equity', '')
wallet_str = coin.get('walletBalance', '')
# Use equity or walletBalance if availableToWithdraw is empty
if available_str:
available = float(available_str)
elif equity_str:
available = float(equity_str)
elif wallet_str:
available = float(wallet_str)
else:
available = 0.0
locked = float(locked_str) if locked_str else 0.0
balances[asset] = {
'free': available,
'locked': locked,
'total': available + locked
}
logger.debug(f"Retrieved balances for {len(balances)} assets")
return balances
else:
logger.error(f"Failed to get all balances: {account_info}")
return {}
except Exception as e:
logger.error(f"Error getting all balances: {e}")
return {}
def get_ticker(self, symbol: str) -> Dict[str, Any]:
"""Get ticker information for a symbol.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
Returns:
Dictionary with ticker information
"""
try:
formatted_symbol = self._format_symbol(symbol)
ticker_response = self.session.get_tickers(
category=self.category,
symbol=formatted_symbol
)
if ticker_response.get('retCode') == 0:
ticker_data = ticker_response.get('result', {}).get('list', [])
if ticker_data:
ticker = ticker_data[0]
# Cache the last price
last_price = float(ticker.get('lastPrice', 0))
self.last_price_cache[symbol] = last_price
return {
'symbol': symbol,
'last_price': last_price,
'bid_price': float(ticker.get('bid1Price', 0)),
'ask_price': float(ticker.get('ask1Price', 0)),
'volume_24h': float(ticker.get('volume24h', 0)),
'change_24h': float(ticker.get('price24hPcnt', 0)),
'high_24h': float(ticker.get('highPrice24h', 0)),
'low_24h': float(ticker.get('lowPrice24h', 0)),
'timestamp': int(ticker.get('time', 0))
}
else:
logger.error(f"No ticker data for {symbol}")
return {}
else:
logger.error(f"Failed to get ticker for {symbol}: {ticker_response}")
return {}
except Exception as e:
logger.error(f"Error getting ticker for {symbol}: {e}")
return {}
def get_instrument_info(self, symbol: str) -> Dict[str, Any]:
"""Get instrument information including minimum order size with caching.
Args:
symbol: Trading symbol (e.g., 'ETHUSDT')
Returns:
Dictionary with instrument information
"""
try:
formatted_symbol = self._format_symbol(symbol)
current_time = time.time()
# Check cache first
if (formatted_symbol in self._instrument_cache and
current_time - self._instrument_cache_time < self._instrument_cache_timeout):
logger.debug(f"Returning cached instrument info for {formatted_symbol}")
return self._instrument_cache[formatted_symbol]
# Get fresh instrument data - check if session is available
if not self.session:
logger.error("No session available for getting instruments")
return {}
instruments = self.get_instruments(self.category)
# Update cache with all instruments
self._instrument_cache.clear()
for instrument in instruments:
if isinstance(instrument, dict) and 'symbol' in instrument:
self._instrument_cache[instrument['symbol']] = instrument
self._instrument_cache_time = current_time
# Return the requested instrument
instrument_info = self._instrument_cache.get(formatted_symbol, {})
if not instrument_info:
logger.warning(f"Instrument {formatted_symbol} not found")
return instrument_info
except Exception as e:
logger.error(f"Error getting instrument info for {symbol}: {e}")
return {}
def _validate_order_size(self, symbol: str, quantity: float) -> Tuple[bool, float, str]:
"""Validate and adjust order size according to instrument requirements.
Args:
symbol: Trading symbol
quantity: Requested quantity
Returns:
Tuple of (is_valid, adjusted_quantity, error_message)
"""
try:
instrument_info = self.get_instrument_info(symbol)
if not instrument_info:
return False, quantity, f"Could not get instrument info for {symbol}"
lot_size_filter = instrument_info.get('lotSizeFilter', {})
min_order_qty = float(lot_size_filter.get('minOrderQty', 0.01))
max_order_qty = float(lot_size_filter.get('maxOrderQty', 10000))
qty_step = float(lot_size_filter.get('qtyStep', 0.01))
logger.debug(f"Validation for {symbol}: min={min_order_qty}, max={max_order_qty}, step={qty_step}, requested={quantity}")
# Check minimum order size
if quantity < min_order_qty:
adjusted_quantity = min_order_qty
logger.warning(f"Order quantity {quantity} below minimum {min_order_qty} for {symbol}, adjusting to {adjusted_quantity}")
return True, adjusted_quantity, f"Adjusted quantity from {quantity:.6f} to minimum {adjusted_quantity:.6f}"
# Check maximum order size
if quantity > max_order_qty:
return False, quantity, f"Order quantity {quantity} exceeds maximum {max_order_qty} for {symbol}"
# Round to correct step size
if qty_step > 0:
steps = round(quantity / qty_step)
adjusted_quantity = steps * qty_step
# Ensure we don't go below minimum after rounding
if adjusted_quantity < min_order_qty:
adjusted_quantity = min_order_qty
if abs(adjusted_quantity - quantity) > 0.000001: # Only log if there's a meaningful difference
logger.info(f"Adjusted quantity for step size: {quantity:.6f} -> {adjusted_quantity:.6f}")
return True, adjusted_quantity, ""
return True, quantity, ""
except Exception as e:
logger.error(f"Error validating order size for {symbol}: {e}")
return False, quantity, f"Validation error: {e}"
def set_leverage(self, symbol: str, leverage: float) -> bool:
"""Set leverage for a symbol.
Args:
symbol: Trading symbol (e.g., 'ETHUSDT')
leverage: Leverage value (e.g., 10.0 for 10x)
Returns:
bool: True if successful, False otherwise
"""
try:
if not self.session:
logger.error("No session available for setting leverage")
return False
formatted_symbol = self._format_symbol(symbol)
# Validate leverage value
if leverage < 1.0 or leverage > 100.0:
logger.error(f"Invalid leverage value: {leverage}. Must be between 1.0 and 100.0")
return False
# Set leverage via Bybit API
response = self.session.set_leverage(
category=self.category,
symbol=formatted_symbol,
buyLeverage=str(leverage),
sellLeverage=str(leverage)
)
if response.get('retCode') == 0:
# Cache the leverage setting
self.leverage_cache[formatted_symbol] = leverage
logger.info(f"Successfully set leverage for {symbol} to {leverage}x")
return True
else:
error_msg = response.get('retMsg', 'Unknown error')
logger.error(f"Failed to set leverage for {symbol}: {error_msg}")
return False
except Exception as e:
logger.error(f"Error setting leverage for {symbol}: {e}")
return False
def get_leverage(self, symbol: str) -> float:
"""Get current leverage for a symbol.
Args:
symbol: Trading symbol (e.g., 'ETHUSDT')
Returns:
float: Current leverage value
"""
try:
if not self.session:
logger.error("No session available for getting leverage")
return self.default_leverage
formatted_symbol = self._format_symbol(symbol)
# Check cache first
if formatted_symbol in self.leverage_cache:
return self.leverage_cache[formatted_symbol]
# Get leverage from API
response = self.session.get_positions(
category=self.category,
symbol=formatted_symbol
)
if response.get('retCode') == 0:
positions = response.get('result', {}).get('list', [])
for position in positions:
if position.get('symbol') == formatted_symbol:
leverage = float(position.get('leverage', self.default_leverage))
# Cache the leverage
self.leverage_cache[formatted_symbol] = leverage
logger.debug(f"Current leverage for {symbol}: {leverage}x")
return leverage
# If no position found, return default
logger.debug(f"No position found for {symbol}, using default leverage: {self.default_leverage}x")
return self.default_leverage
except Exception as e:
logger.error(f"Error getting leverage for {symbol}: {e}")
return self.default_leverage
def ensure_leverage(self, symbol: str, target_leverage: float = None) -> bool:
"""Ensure symbol has the target leverage set.
Args:
symbol: Trading symbol
target_leverage: Target leverage (uses default if None)
Returns:
bool: True if leverage is set correctly
"""
try:
if target_leverage is None:
target_leverage = self.default_leverage
current_leverage = self.get_leverage(symbol)
if abs(current_leverage - target_leverage) < 0.1: # Allow small tolerance
logger.debug(f"Leverage for {symbol} already set to {current_leverage}x (target: {target_leverage}x)")
return True
else:
logger.info(f"Setting leverage for {symbol} from {current_leverage}x to {target_leverage}x")
return self.set_leverage(symbol, target_leverage)
except Exception as e:
logger.error(f"Error ensuring leverage for {symbol}: {e}")
return False
def place_order(self, symbol: str, side: str, order_type: str,
quantity: float, price: float = None) -> Dict[str, Any]:
"""Place an order with minimum size validation and leverage support.
Args:
symbol: Trading symbol (e.g., 'BTCUSDT')
side: 'buy' or 'sell'
order_type: 'market' or 'limit'
quantity: Order quantity
price: Order price (required for limit orders)
Returns:
Dictionary with order information
"""
try:
formatted_symbol = self._format_symbol(symbol)
# Ensure leverage is set before placing order
if not self.ensure_leverage(symbol, self.default_leverage):
logger.warning(f"Failed to set leverage for {symbol}, proceeding with order anyway")
# Validate and adjust order size
is_valid, adjusted_quantity, error_msg = self._validate_order_size(formatted_symbol, quantity)
if not is_valid:
logger.error(f"Order validation failed: {error_msg}")
return {'error': error_msg}
# Log adjustment if made
if adjusted_quantity != quantity:
logger.info(f"BYBIT ORDER SIZE ADJUSTMENT: {symbol} quantity {quantity:.6f} -> {adjusted_quantity:.6f}")
bybit_side = side.capitalize() # 'Buy' or 'Sell'
bybit_order_type = self._map_order_type(order_type)
order_params = {
'category': self.category,
'symbol': formatted_symbol,
'side': bybit_side,
'orderType': bybit_order_type,
'qty': str(adjusted_quantity),
}
if order_type.lower() == 'limit' and price is not None:
order_params['price'] = str(price)
order_params['timeInForce'] = 'GTC' # Good Till Cancelled
response = self.session.place_order(**order_params)
if response.get('retCode') == 0:
result = response.get('result', {})
order_info = {
'order_id': result.get('orderId'),
'symbol': symbol,
'side': side,
'type': order_type,
'quantity': adjusted_quantity, # Return the actual quantity used
'price': price,
'status': 'submitted',
'timestamp': int(time.time() * 1000)
}
current_leverage = self.get_leverage(symbol)
logger.info(f"Successfully placed {order_type} {side} order for {adjusted_quantity} {symbol} at {current_leverage}x leverage")
return order_info
else:
error_msg = response.get('retMsg', 'Unknown error')
logger.error(f"Failed to place order: {error_msg}")
return {'error': error_msg}
except Exception as e:
logger.error(f"Error placing order: {e}")
return {'error': str(e)}
def _process_pybit_orders(self, orders_list: List[Dict]) -> List[Dict[str, Any]]:
"""Process orders from pybit response format."""
open_orders = []
for order in orders_list:
order_info = {
'order_id': order.get('orderId'),
'symbol': order.get('symbol'),
'side': order.get('side', '').lower(),
'type': order.get('orderType', '').lower(),
'quantity': float(order.get('qty', 0)),
'filled_quantity': float(order.get('cumExecQty', 0)),
'price': float(order.get('price', 0)),
'status': self._map_order_status(order.get('orderStatus', '')),
'timestamp': int(order.get('createdTime', 0))
}
open_orders.append(order_info)
return open_orders
def _process_rest_orders(self, orders_list: List[Dict]) -> List[Dict[str, Any]]:
"""Process orders from REST client response format."""
# REST client returns same format as pybit, so we can reuse the method
return self._process_pybit_orders(orders_list)
def cancel_order(self, symbol: str, order_id: str) -> bool:
"""Cancel an order.
Args:
symbol: Trading symbol
order_id: Order ID to cancel
Returns:
bool: True if order was cancelled successfully
"""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.cancel_order(
category=self.category,
symbol=formatted_symbol,
orderId=order_id
)
if response.get('retCode') == 0:
logger.info(f"Successfully cancelled order {order_id}")
return True
else:
error_msg = response.get('retMsg', 'Unknown error')
logger.error(f"Failed to cancel order {order_id}: {error_msg}")
return False
except Exception as e:
logger.error(f"Error cancelling order {order_id}: {e}")
return False
def get_order_status(self, symbol: str, order_id: str) -> Dict[str, Any]:
"""Get status of an order.
Args:
symbol: Trading symbol
order_id: Order ID
Returns:
Dictionary with order status information
"""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.get_open_orders(
category=self.category,
symbol=formatted_symbol,
orderId=order_id
)
if response.get('retCode') == 0:
orders = response.get('result', {}).get('list', [])
if orders:
order = orders[0]
return {
'order_id': order.get('orderId'),
'symbol': symbol,
'side': order.get('side', '').lower(),
'type': order.get('orderType', '').lower(),
'quantity': float(order.get('qty', 0)),
'filled_quantity': float(order.get('cumExecQty', 0)),
'price': float(order.get('price', 0)),
'average_price': float(order.get('avgPrice', 0)),
'status': self._map_order_status(order.get('orderStatus', '')),
'timestamp': int(order.get('createdTime', 0))
}
else:
# Order might be filled/cancelled, check order history
return self._get_order_from_history(symbol, order_id)
else:
logger.error(f"Failed to get order status: {response}")
return {}
except Exception as e:
logger.error(f"Error getting order status for {order_id}: {e}")
return {}
def _get_order_from_history(self, symbol: str, order_id: str) -> Dict[str, Any]:
"""Get order from order history (for filled/cancelled orders)."""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.get_order_history(
category=self.category,
symbol=formatted_symbol,
orderId=order_id
)
if response.get('retCode') == 0:
orders = response.get('result', {}).get('list', [])
if orders:
order = orders[0]
return {
'order_id': order.get('orderId'),
'symbol': symbol,
'side': order.get('side', '').lower(),
'type': order.get('orderType', '').lower(),
'quantity': float(order.get('qty', 0)),
'filled_quantity': float(order.get('cumExecQty', 0)),
'price': float(order.get('price', 0)),
'average_price': float(order.get('avgPrice', 0)),
'status': self._map_order_status(order.get('orderStatus', '')),
'timestamp': int(order.get('createdTime', 0))
}
return {}
except Exception as e:
logger.error(f"Error getting order from history: {e}")
return {}
def get_open_orders(self, symbol: str = None) -> List[Dict[str, Any]]:
"""Get open orders with caching and fallback to REST client.
Args:
symbol: Trading symbol (optional, gets all if None)
Returns:
List of open order dictionaries
"""
try:
import time
current_time = time.time()
cache_key = symbol or 'all'
# Check if we have fresh cached data
if (cache_key in self._open_orders_cache and
current_time - self._open_orders_cache_time < self._cache_timeout):
logger.debug(f"Returning cached open orders for {cache_key}")
return self._open_orders_cache[cache_key]
# Try pybit first if not using fallback
if not self.use_fallback and self.session:
try:
params = {
'category': self.category,
'openOnly': True
}
if symbol:
params['symbol'] = self._format_symbol(symbol)
response = self.session.get_open_orders(**params)
# Process pybit response
if response.get('retCode') == 0:
orders = self._process_pybit_orders(response.get('result', {}).get('list', []))
# Cache the result
self._open_orders_cache[cache_key] = orders
self._open_orders_cache_time = current_time
logger.debug(f"Found {len(orders)} open orders via pybit, cached for {self._cache_timeout}s")
return orders
else:
logger.warning(f"pybit get_open_orders failed: {response}")
raise Exception("pybit failed")
except Exception as e:
error_str = str(e)
if "10016" in error_str or "System error" in error_str:
logger.warning(f"pybit rate limited (Error 10016), switching to REST fallback: {e}")
self.use_fallback = True
else:
logger.warning(f"pybit get_open_orders error, trying REST fallback: {e}")
# Use REST client (either as primary or fallback)
if self.rest_client:
formatted_symbol = self._format_symbol(symbol) if symbol else None
response = self.rest_client.get_open_orders(self.category, formatted_symbol)
orders = self._process_rest_orders(response.get('result', {}).get('list', []))
# Cache the result
self._open_orders_cache[cache_key] = orders
self._open_orders_cache_time = current_time
logger.debug(f"Found {len(orders)} open orders via REST client, cached for {self._cache_timeout}s")
return orders
else:
logger.error("No available API client (pybit or REST)")
return []
except Exception as e:
logger.error(f"Error getting open orders: {e}")
return []
def get_positions(self, symbol: str = None) -> List[Dict[str, Any]]:
"""Get position information.
Args:
symbol: Trading symbol (optional, gets all if None)
Returns:
List of position dictionaries
"""
try:
params = {'category': self.category}
if symbol:
params['symbol'] = self._format_symbol(symbol)
response = self.session.get_positions(**params)
if response.get('retCode') == 0:
positions = response.get('result', {}).get('list', [])
position_list = []
for pos in positions:
# Only include positions with non-zero size
size = float(pos.get('size', 0))
if size != 0:
position_info = {
'symbol': pos.get('symbol'),
'side': pos.get('side', '').lower(),
'size': size,
'entry_price': float(pos.get('avgPrice', 0)),
'mark_price': float(pos.get('markPrice', 0)),
'unrealized_pnl': float(pos.get('unrealisedPnl', 0)),
'percentage': float(pos.get('unrealisedPnlPct', 0)),
'leverage': float(pos.get('leverage', 0)),
'timestamp': int(pos.get('updatedTime', 0))
}
position_list.append(position_info)
logger.debug(f"Found {len(position_list)} positions")
return position_list
else:
logger.error(f"Failed to get positions: {response}")
return []
except Exception as e:
logger.error(f"Error getting positions: {e}")
return []
def _format_symbol(self, symbol: str) -> str:
"""Format symbol for Bybit API.
Args:
symbol: Symbol in various formats
Returns:
Formatted symbol for Bybit
"""
# Remove any separators and convert to uppercase
clean_symbol = symbol.replace('/', '').replace('-', '').replace('_', '').upper()
# Common mappings
symbol_mapping = {
'BTCUSDT': 'BTCUSDT',
'ETHUSDT': 'ETHUSDT',
'BTCUSD': 'BTCUSDT', # Map to USDT perpetual
'ETHUSD': 'ETHUSDT', # Map to USDT perpetual
}
return symbol_mapping.get(clean_symbol, clean_symbol)
def _map_order_type(self, order_type: str) -> str:
"""Map order type to Bybit format.
Args:
order_type: Order type ('market', 'limit')
Returns:
Bybit order type
"""
type_mapping = {
'market': 'Market',
'limit': 'Limit',
'stop': 'Stop',
'stop_limit': 'StopLimit'
}
return type_mapping.get(order_type.lower(), 'Market')
def _map_order_status(self, status: str) -> str:
"""Map Bybit order status to standard format.
Args:
status: Bybit order status
Returns:
Standardized order status
"""
status_mapping = {
'New': 'open',
'PartiallyFilled': 'partially_filled',
'Filled': 'filled',
'Cancelled': 'cancelled',
'Rejected': 'rejected',
'PartiallyFilledCanceled': 'cancelled'
}
return status_mapping.get(status, status.lower())
def get_orderbook(self, symbol: str, depth: int = 25) -> Dict[str, Any]:
"""Get orderbook for a symbol.
Args:
symbol: Trading symbol
depth: Number of price levels to return (max 200)
Returns:
Dictionary with orderbook data
"""
try:
formatted_symbol = self._format_symbol(symbol)
response = self.session.get_orderbook(
category=self.category,
symbol=formatted_symbol,
limit=min(depth, 200) # Bybit max limit is 200
)
if response.get('retCode') == 0:
orderbook_data = response.get('result', {})
bids = []
asks = []
# Process bids (buy orders)
for bid in orderbook_data.get('b', []):
bids.append([float(bid[0]), float(bid[1])])
# Process asks (sell orders)
for ask in orderbook_data.get('a', []):
asks.append([float(ask[0]), float(ask[1])])
return {
'symbol': symbol,
'bids': bids,
'asks': asks,
'timestamp': int(orderbook_data.get('ts', 0))
}
else:
logger.error(f"Failed to get orderbook for {symbol}: {response}")
return {}
except Exception as e:
logger.error(f"Error getting orderbook for {symbol}: {e}")
return {}
def close_position(self, symbol: str, quantity: float = None) -> Dict[str, Any]:
"""Close a position (market order in opposite direction).
Args:
symbol: Trading symbol
quantity: Quantity to close (None for full position)
Returns:
Dictionary with order information
"""
try:
# Get current position
positions = self.get_positions(symbol)
if not positions:
logger.warning(f"No position found for {symbol}")
return {'error': 'No position found'}
position = positions[0]
position_size = position['size']
position_side = position['side']
# Determine close quantity
close_quantity = quantity if quantity is not None else abs(position_size)
# Determine opposite side
close_side = 'sell' if position_side == 'buy' else 'buy'
# Place market order to close position
return self.place_order(
symbol=symbol,
side=close_side,
order_type='market',
quantity=close_quantity
)
except Exception as e:
logger.error(f"Error closing position for {symbol}: {e}")
return {'error': str(e)}