778 lines
31 KiB
Python
778 lines
31 KiB
Python
"""
|
|
Bybit Interface
|
|
|
|
"""
|
|
|
|
|
|
import logging
|
|
import time
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
from datetime import datetime, timezone
|
|
import json
|
|
import os
|
|
|
|
try:
|
|
from pybit.unified_trading import HTTP
|
|
except ImportError:
|
|
HTTP = None
|
|
logging.warning("pybit not installed. Run: pip install pybit")
|
|
|
|
from .exchange_interface import ExchangeInterface
|
|
from .bybit_rest_client import BybitRestClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BybitInterface(ExchangeInterface):
|
|
"""Bybit Exchange API Interface for cryptocurrency derivatives trading.
|
|
|
|
Supports both testnet and live trading environments.
|
|
Focus on USDT perpetuals and spot trading.
|
|
"""
|
|
|
|
def __init__(self, api_key: str = "", api_secret: str = "", test_mode: bool = True):
|
|
"""Initialize Bybit exchange interface.
|
|
|
|
Args:
|
|
api_key: Bybit API key
|
|
api_secret: Bybit API secret
|
|
test_mode: If True, use testnet environment
|
|
"""
|
|
super().__init__(api_key, api_secret, test_mode)
|
|
|
|
# Bybit-specific settings
|
|
self.session = None
|
|
self.rest_client = None # Raw REST client fallback
|
|
self.category = "linear" # Default to USDT perpetuals
|
|
self.supported_symbols = set()
|
|
self.use_fallback = False # Track if we should use REST client
|
|
|
|
# Caching to reduce API calls and avoid rate limiting
|
|
self._open_orders_cache = {}
|
|
self._open_orders_cache_time = 0
|
|
self._cache_timeout = 5 # 5 seconds cache timeout
|
|
|
|
# Load credentials from environment if not provided
|
|
if not api_key:
|
|
self.api_key = os.getenv('BYBIT_API_KEY', '')
|
|
if not api_secret:
|
|
self.api_secret = os.getenv('BYBIT_API_SECRET', '')
|
|
|
|
logger.info(f"Initialized BybitInterface (testnet: {test_mode})")
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to Bybit API.
|
|
|
|
Returns:
|
|
bool: True if connection successful, False otherwise
|
|
"""
|
|
try:
|
|
if HTTP is None:
|
|
logger.error("pybit library not installed")
|
|
return False
|
|
|
|
if not self.api_key or not self.api_secret:
|
|
logger.error("API key and secret required for Bybit connection")
|
|
return False
|
|
|
|
# Initialize pybit session
|
|
self.session = HTTP(
|
|
testnet=self.test_mode,
|
|
api_key=self.api_key,
|
|
api_secret=self.api_secret,
|
|
)
|
|
|
|
# Initialize raw REST client as fallback
|
|
self.rest_client = BybitRestClient(
|
|
api_key=self.api_key,
|
|
api_secret=self.api_secret,
|
|
testnet=self.test_mode
|
|
)
|
|
|
|
# Test pybit connection first
|
|
try:
|
|
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
|
|
if account_info.get('retCode') == 0:
|
|
logger.info(f"Successfully connected to Bybit via pybit (testnet: {self.test_mode})")
|
|
self.use_fallback = False
|
|
self._load_instruments()
|
|
return True
|
|
else:
|
|
logger.warning(f"pybit connection failed: {account_info}")
|
|
raise Exception("pybit connection failed")
|
|
except Exception as e:
|
|
logger.warning(f"pybit failed, trying REST client fallback: {e}")
|
|
|
|
# Test REST client fallback
|
|
if self.rest_client.test_connectivity() and self.rest_client.test_authentication():
|
|
logger.info(f"Successfully connected to Bybit via REST client fallback (testnet: {self.test_mode})")
|
|
self.use_fallback = True
|
|
self._load_instruments()
|
|
return True
|
|
else:
|
|
logger.error("Both pybit and REST client failed")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error connecting to Bybit: {e}")
|
|
return False
|
|
|
|
def _load_instruments(self) -> None:
|
|
"""Load available trading instruments."""
|
|
try:
|
|
instruments_response = self.session.get_instruments_info(category=self.category)
|
|
if instruments_response.get('retCode') == 0:
|
|
instruments = instruments_response.get('result', {}).get('list', [])
|
|
self.supported_symbols = {instr['symbol'] for instr in instruments}
|
|
logger.info(f"Loaded {len(self.supported_symbols)} instruments")
|
|
else:
|
|
logger.warning(f"Failed to load instruments: {instruments_response}")
|
|
except Exception as e:
|
|
logger.warning(f"Error loading instruments: {e}")
|
|
|
|
def get_instruments(self, category: str = "linear") -> List[Dict[str, Any]]:
|
|
"""Get available trading instruments.
|
|
|
|
Args:
|
|
category: Instrument category (linear, spot, inverse, option)
|
|
|
|
Returns:
|
|
List of instrument dictionaries
|
|
"""
|
|
try:
|
|
response = self.session.get_instruments_info(category=category)
|
|
if response.get('retCode') == 0:
|
|
return response.get('result', {}).get('list', [])
|
|
else:
|
|
logger.error(f"Failed to get instruments: {response}")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error getting instruments: {e}")
|
|
return []
|
|
|
|
def get_balance(self, asset: str) -> float:
|
|
"""Get balance of a specific asset.
|
|
|
|
Args:
|
|
asset: Asset symbol (e.g., 'BTC', 'USDT')
|
|
|
|
Returns:
|
|
float: Available balance of the asset
|
|
"""
|
|
try:
|
|
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
|
|
if account_info.get('retCode') == 0:
|
|
balances = account_info.get('result', {}).get('list', [])
|
|
|
|
for account in balances:
|
|
coins = account.get('coin', [])
|
|
for coin in coins:
|
|
if coin.get('coin', '').upper() == asset.upper():
|
|
# Try availableToWithdraw first, then equity, then walletBalance
|
|
available_str = coin.get('availableToWithdraw', '')
|
|
if available_str:
|
|
available_balance = float(available_str)
|
|
else:
|
|
# Use equity if availableToWithdraw is empty
|
|
equity_str = coin.get('equity', '')
|
|
if equity_str:
|
|
available_balance = float(equity_str)
|
|
else:
|
|
# Fall back to walletBalance
|
|
wallet_str = coin.get('walletBalance', '0')
|
|
available_balance = float(wallet_str) if wallet_str else 0.0
|
|
|
|
logger.debug(f"Balance for {asset}: {available_balance}")
|
|
return available_balance
|
|
|
|
logger.debug(f"No balance found for asset {asset}")
|
|
return 0.0
|
|
else:
|
|
logger.error(f"Failed to get account balance: {account_info}")
|
|
return 0.0
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting balance for {asset}: {e}")
|
|
return 0.0
|
|
|
|
def get_account_summary(self) -> Dict[str, Any]:
|
|
"""Get account summary with all balances and positions.
|
|
|
|
Returns:
|
|
Dictionary with account information
|
|
"""
|
|
try:
|
|
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
|
|
if account_info.get('retCode') == 0:
|
|
return account_info
|
|
else:
|
|
logger.error(f"Failed to get account summary: {account_info}")
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error getting account summary: {e}")
|
|
return {}
|
|
|
|
def get_account_info(self) -> Dict[str, Any]:
|
|
"""Get account information (alias for get_account_summary for compatibility).
|
|
|
|
Returns:
|
|
Dictionary with account information
|
|
"""
|
|
return self.get_account_summary()
|
|
|
|
def get_all_balances(self) -> Dict[str, Dict[str, float]]:
|
|
"""Get all account balances in the format expected by trading executor.
|
|
|
|
Returns:
|
|
Dictionary with asset balances in format: {asset: {'free': float, 'locked': float}}
|
|
"""
|
|
try:
|
|
account_info = self.session.get_wallet_balance(accountType="UNIFIED")
|
|
if account_info.get('retCode') == 0:
|
|
balances = {}
|
|
accounts = account_info.get('result', {}).get('list', [])
|
|
|
|
for account in accounts:
|
|
coins = account.get('coin', [])
|
|
for coin in coins:
|
|
asset = coin.get('coin', '')
|
|
if asset:
|
|
# Convert Bybit balance format to MEXC-compatible format
|
|
# Handle empty string values that cause conversion errors
|
|
available_str = coin.get('availableToWithdraw', '')
|
|
locked_str = coin.get('locked', '')
|
|
equity_str = coin.get('equity', '')
|
|
wallet_str = coin.get('walletBalance', '')
|
|
|
|
# Use equity or walletBalance if availableToWithdraw is empty
|
|
if available_str:
|
|
available = float(available_str)
|
|
elif equity_str:
|
|
available = float(equity_str)
|
|
elif wallet_str:
|
|
available = float(wallet_str)
|
|
else:
|
|
available = 0.0
|
|
|
|
locked = float(locked_str) if locked_str else 0.0
|
|
|
|
balances[asset] = {
|
|
'free': available,
|
|
'locked': locked,
|
|
'total': available + locked
|
|
}
|
|
|
|
logger.debug(f"Retrieved balances for {len(balances)} assets")
|
|
return balances
|
|
else:
|
|
logger.error(f"Failed to get all balances: {account_info}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting all balances: {e}")
|
|
return {}
|
|
|
|
def get_ticker(self, symbol: str) -> Dict[str, Any]:
|
|
"""Get ticker information for a symbol.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTCUSDT')
|
|
|
|
Returns:
|
|
Dictionary with ticker information
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
ticker_response = self.session.get_tickers(
|
|
category=self.category,
|
|
symbol=formatted_symbol
|
|
)
|
|
|
|
if ticker_response.get('retCode') == 0:
|
|
ticker_data = ticker_response.get('result', {}).get('list', [])
|
|
if ticker_data:
|
|
ticker = ticker_data[0]
|
|
|
|
# Cache the last price
|
|
last_price = float(ticker.get('lastPrice', 0))
|
|
self.last_price_cache[symbol] = last_price
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'last_price': last_price,
|
|
'bid_price': float(ticker.get('bid1Price', 0)),
|
|
'ask_price': float(ticker.get('ask1Price', 0)),
|
|
'volume_24h': float(ticker.get('volume24h', 0)),
|
|
'change_24h': float(ticker.get('price24hPcnt', 0)),
|
|
'high_24h': float(ticker.get('highPrice24h', 0)),
|
|
'low_24h': float(ticker.get('lowPrice24h', 0)),
|
|
'timestamp': int(ticker.get('time', 0))
|
|
}
|
|
else:
|
|
logger.error(f"No ticker data for {symbol}")
|
|
return {}
|
|
else:
|
|
logger.error(f"Failed to get ticker for {symbol}: {ticker_response}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting ticker for {symbol}: {e}")
|
|
return {}
|
|
|
|
def place_order(self, symbol: str, side: str, order_type: str,
|
|
quantity: float, price: float = None) -> Dict[str, Any]:
|
|
"""Place an order.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'BTCUSDT')
|
|
side: 'buy' or 'sell'
|
|
order_type: 'market' or 'limit'
|
|
quantity: Order quantity
|
|
price: Order price (required for limit orders)
|
|
|
|
Returns:
|
|
Dictionary with order information
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
bybit_side = side.capitalize() # 'Buy' or 'Sell'
|
|
bybit_order_type = self._map_order_type(order_type)
|
|
|
|
order_params = {
|
|
'category': self.category,
|
|
'symbol': formatted_symbol,
|
|
'side': bybit_side,
|
|
'orderType': bybit_order_type,
|
|
'qty': str(quantity),
|
|
}
|
|
|
|
if order_type.lower() == 'limit' and price is not None:
|
|
order_params['price'] = str(price)
|
|
order_params['timeInForce'] = 'GTC' # Good Till Cancelled
|
|
|
|
response = self.session.place_order(**order_params)
|
|
|
|
if response.get('retCode') == 0:
|
|
result = response.get('result', {})
|
|
order_info = {
|
|
'order_id': result.get('orderId'),
|
|
'symbol': symbol,
|
|
'side': side,
|
|
'type': order_type,
|
|
'quantity': quantity,
|
|
'price': price,
|
|
'status': 'submitted',
|
|
'timestamp': int(time.time() * 1000)
|
|
}
|
|
|
|
logger.info(f"Successfully placed {order_type} {side} order for {quantity} {symbol}")
|
|
return order_info
|
|
else:
|
|
error_msg = response.get('retMsg', 'Unknown error')
|
|
logger.error(f"Failed to place order: {error_msg}")
|
|
return {'error': error_msg}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error placing order: {e}")
|
|
return {'error': str(e)}
|
|
|
|
def _process_pybit_orders(self, orders_list: List[Dict]) -> List[Dict[str, Any]]:
|
|
"""Process orders from pybit response format."""
|
|
open_orders = []
|
|
for order in orders_list:
|
|
order_info = {
|
|
'order_id': order.get('orderId'),
|
|
'symbol': order.get('symbol'),
|
|
'side': order.get('side', '').lower(),
|
|
'type': order.get('orderType', '').lower(),
|
|
'quantity': float(order.get('qty', 0)),
|
|
'filled_quantity': float(order.get('cumExecQty', 0)),
|
|
'price': float(order.get('price', 0)),
|
|
'status': self._map_order_status(order.get('orderStatus', '')),
|
|
'timestamp': int(order.get('createdTime', 0))
|
|
}
|
|
open_orders.append(order_info)
|
|
return open_orders
|
|
|
|
def _process_rest_orders(self, orders_list: List[Dict]) -> List[Dict[str, Any]]:
|
|
"""Process orders from REST client response format."""
|
|
# REST client returns same format as pybit, so we can reuse the method
|
|
return self._process_pybit_orders(orders_list)
|
|
|
|
def cancel_order(self, symbol: str, order_id: str) -> bool:
|
|
"""Cancel an order.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
order_id: Order ID to cancel
|
|
|
|
Returns:
|
|
bool: True if order was cancelled successfully
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.cancel_order(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
orderId=order_id
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
logger.info(f"Successfully cancelled order {order_id}")
|
|
return True
|
|
else:
|
|
error_msg = response.get('retMsg', 'Unknown error')
|
|
logger.error(f"Failed to cancel order {order_id}: {error_msg}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error cancelling order {order_id}: {e}")
|
|
return False
|
|
|
|
def get_order_status(self, symbol: str, order_id: str) -> Dict[str, Any]:
|
|
"""Get status of an order.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
order_id: Order ID
|
|
|
|
Returns:
|
|
Dictionary with order status information
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_open_orders(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
orderId=order_id
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
orders = response.get('result', {}).get('list', [])
|
|
if orders:
|
|
order = orders[0]
|
|
return {
|
|
'order_id': order.get('orderId'),
|
|
'symbol': symbol,
|
|
'side': order.get('side', '').lower(),
|
|
'type': order.get('orderType', '').lower(),
|
|
'quantity': float(order.get('qty', 0)),
|
|
'filled_quantity': float(order.get('cumExecQty', 0)),
|
|
'price': float(order.get('price', 0)),
|
|
'average_price': float(order.get('avgPrice', 0)),
|
|
'status': self._map_order_status(order.get('orderStatus', '')),
|
|
'timestamp': int(order.get('createdTime', 0))
|
|
}
|
|
else:
|
|
# Order might be filled/cancelled, check order history
|
|
return self._get_order_from_history(symbol, order_id)
|
|
else:
|
|
logger.error(f"Failed to get order status: {response}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting order status for {order_id}: {e}")
|
|
return {}
|
|
|
|
def _get_order_from_history(self, symbol: str, order_id: str) -> Dict[str, Any]:
|
|
"""Get order from order history (for filled/cancelled orders)."""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_order_history(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
orderId=order_id
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
orders = response.get('result', {}).get('list', [])
|
|
if orders:
|
|
order = orders[0]
|
|
return {
|
|
'order_id': order.get('orderId'),
|
|
'symbol': symbol,
|
|
'side': order.get('side', '').lower(),
|
|
'type': order.get('orderType', '').lower(),
|
|
'quantity': float(order.get('qty', 0)),
|
|
'filled_quantity': float(order.get('cumExecQty', 0)),
|
|
'price': float(order.get('price', 0)),
|
|
'average_price': float(order.get('avgPrice', 0)),
|
|
'status': self._map_order_status(order.get('orderStatus', '')),
|
|
'timestamp': int(order.get('createdTime', 0))
|
|
}
|
|
|
|
return {}
|
|
except Exception as e:
|
|
logger.error(f"Error getting order from history: {e}")
|
|
return {}
|
|
|
|
def get_open_orders(self, symbol: str = None) -> List[Dict[str, Any]]:
|
|
"""Get open orders with caching and fallback to REST client.
|
|
|
|
Args:
|
|
symbol: Trading symbol (optional, gets all if None)
|
|
|
|
Returns:
|
|
List of open order dictionaries
|
|
"""
|
|
try:
|
|
import time
|
|
current_time = time.time()
|
|
cache_key = symbol or 'all'
|
|
|
|
# Check if we have fresh cached data
|
|
if (cache_key in self._open_orders_cache and
|
|
current_time - self._open_orders_cache_time < self._cache_timeout):
|
|
logger.debug(f"Returning cached open orders for {cache_key}")
|
|
return self._open_orders_cache[cache_key]
|
|
|
|
# Try pybit first if not using fallback
|
|
if not self.use_fallback and self.session:
|
|
try:
|
|
params = {
|
|
'category': self.category,
|
|
'openOnly': True
|
|
}
|
|
|
|
if symbol:
|
|
params['symbol'] = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_open_orders(**params)
|
|
|
|
# Process pybit response
|
|
if response.get('retCode') == 0:
|
|
orders = self._process_pybit_orders(response.get('result', {}).get('list', []))
|
|
# Cache the result
|
|
self._open_orders_cache[cache_key] = orders
|
|
self._open_orders_cache_time = current_time
|
|
logger.debug(f"Found {len(orders)} open orders via pybit, cached for {self._cache_timeout}s")
|
|
return orders
|
|
else:
|
|
logger.warning(f"pybit get_open_orders failed: {response}")
|
|
raise Exception("pybit failed")
|
|
|
|
except Exception as e:
|
|
error_str = str(e)
|
|
if "10016" in error_str or "System error" in error_str:
|
|
logger.warning(f"pybit rate limited (Error 10016), switching to REST fallback: {e}")
|
|
self.use_fallback = True
|
|
else:
|
|
logger.warning(f"pybit get_open_orders error, trying REST fallback: {e}")
|
|
|
|
# Use REST client (either as primary or fallback)
|
|
if self.rest_client:
|
|
formatted_symbol = self._format_symbol(symbol) if symbol else None
|
|
response = self.rest_client.get_open_orders(self.category, formatted_symbol)
|
|
|
|
orders = self._process_rest_orders(response.get('result', {}).get('list', []))
|
|
|
|
# Cache the result
|
|
self._open_orders_cache[cache_key] = orders
|
|
self._open_orders_cache_time = current_time
|
|
|
|
logger.debug(f"Found {len(orders)} open orders via REST client, cached for {self._cache_timeout}s")
|
|
return orders
|
|
else:
|
|
logger.error("No available API client (pybit or REST)")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting open orders: {e}")
|
|
return []
|
|
|
|
def get_positions(self, symbol: str = None) -> List[Dict[str, Any]]:
|
|
"""Get position information.
|
|
|
|
Args:
|
|
symbol: Trading symbol (optional, gets all if None)
|
|
|
|
Returns:
|
|
List of position dictionaries
|
|
"""
|
|
try:
|
|
params = {'category': self.category}
|
|
if symbol:
|
|
params['symbol'] = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_positions(**params)
|
|
|
|
if response.get('retCode') == 0:
|
|
positions = response.get('result', {}).get('list', [])
|
|
|
|
position_list = []
|
|
for pos in positions:
|
|
# Only include positions with non-zero size
|
|
size = float(pos.get('size', 0))
|
|
if size != 0:
|
|
position_info = {
|
|
'symbol': pos.get('symbol'),
|
|
'side': pos.get('side', '').lower(),
|
|
'size': size,
|
|
'entry_price': float(pos.get('avgPrice', 0)),
|
|
'mark_price': float(pos.get('markPrice', 0)),
|
|
'unrealized_pnl': float(pos.get('unrealisedPnl', 0)),
|
|
'percentage': float(pos.get('unrealisedPnlPct', 0)),
|
|
'leverage': float(pos.get('leverage', 0)),
|
|
'timestamp': int(pos.get('updatedTime', 0))
|
|
}
|
|
position_list.append(position_info)
|
|
|
|
logger.debug(f"Found {len(position_list)} positions")
|
|
return position_list
|
|
else:
|
|
logger.error(f"Failed to get positions: {response}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting positions: {e}")
|
|
return []
|
|
|
|
def _format_symbol(self, symbol: str) -> str:
|
|
"""Format symbol for Bybit API.
|
|
|
|
Args:
|
|
symbol: Symbol in various formats
|
|
|
|
Returns:
|
|
Formatted symbol for Bybit
|
|
"""
|
|
# Remove any separators and convert to uppercase
|
|
clean_symbol = symbol.replace('/', '').replace('-', '').replace('_', '').upper()
|
|
|
|
# Common mappings
|
|
symbol_mapping = {
|
|
'BTCUSDT': 'BTCUSDT',
|
|
'ETHUSDT': 'ETHUSDT',
|
|
'BTCUSD': 'BTCUSDT', # Map to USDT perpetual
|
|
'ETHUSD': 'ETHUSDT', # Map to USDT perpetual
|
|
}
|
|
|
|
return symbol_mapping.get(clean_symbol, clean_symbol)
|
|
|
|
def _map_order_type(self, order_type: str) -> str:
|
|
"""Map order type to Bybit format.
|
|
|
|
Args:
|
|
order_type: Order type ('market', 'limit')
|
|
|
|
Returns:
|
|
Bybit order type
|
|
"""
|
|
type_mapping = {
|
|
'market': 'Market',
|
|
'limit': 'Limit',
|
|
'stop': 'Stop',
|
|
'stop_limit': 'StopLimit'
|
|
}
|
|
|
|
return type_mapping.get(order_type.lower(), 'Market')
|
|
|
|
def _map_order_status(self, status: str) -> str:
|
|
"""Map Bybit order status to standard format.
|
|
|
|
Args:
|
|
status: Bybit order status
|
|
|
|
Returns:
|
|
Standardized order status
|
|
"""
|
|
status_mapping = {
|
|
'New': 'open',
|
|
'PartiallyFilled': 'partially_filled',
|
|
'Filled': 'filled',
|
|
'Cancelled': 'cancelled',
|
|
'Rejected': 'rejected',
|
|
'PartiallyFilledCanceled': 'cancelled'
|
|
}
|
|
|
|
return status_mapping.get(status, status.lower())
|
|
|
|
def get_orderbook(self, symbol: str, depth: int = 25) -> Dict[str, Any]:
|
|
"""Get orderbook for a symbol.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
depth: Number of price levels to return (max 200)
|
|
|
|
Returns:
|
|
Dictionary with orderbook data
|
|
"""
|
|
try:
|
|
formatted_symbol = self._format_symbol(symbol)
|
|
|
|
response = self.session.get_orderbook(
|
|
category=self.category,
|
|
symbol=formatted_symbol,
|
|
limit=min(depth, 200) # Bybit max limit is 200
|
|
)
|
|
|
|
if response.get('retCode') == 0:
|
|
orderbook_data = response.get('result', {})
|
|
|
|
bids = []
|
|
asks = []
|
|
|
|
# Process bids (buy orders)
|
|
for bid in orderbook_data.get('b', []):
|
|
bids.append([float(bid[0]), float(bid[1])])
|
|
|
|
# Process asks (sell orders)
|
|
for ask in orderbook_data.get('a', []):
|
|
asks.append([float(ask[0]), float(ask[1])])
|
|
|
|
return {
|
|
'symbol': symbol,
|
|
'bids': bids,
|
|
'asks': asks,
|
|
'timestamp': int(orderbook_data.get('ts', 0))
|
|
}
|
|
else:
|
|
logger.error(f"Failed to get orderbook for {symbol}: {response}")
|
|
return {}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting orderbook for {symbol}: {e}")
|
|
return {}
|
|
|
|
def close_position(self, symbol: str, quantity: float = None) -> Dict[str, Any]:
|
|
"""Close a position (market order in opposite direction).
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
quantity: Quantity to close (None for full position)
|
|
|
|
Returns:
|
|
Dictionary with order information
|
|
"""
|
|
try:
|
|
# Get current position
|
|
positions = self.get_positions(symbol)
|
|
if not positions:
|
|
logger.warning(f"No position found for {symbol}")
|
|
return {'error': 'No position found'}
|
|
|
|
position = positions[0]
|
|
position_size = position['size']
|
|
position_side = position['side']
|
|
|
|
# Determine close quantity
|
|
close_quantity = quantity if quantity is not None else abs(position_size)
|
|
|
|
# Determine opposite side
|
|
close_side = 'sell' if position_side == 'buy' else 'buy'
|
|
|
|
# Place market order to close position
|
|
return self.place_order(
|
|
symbol=symbol,
|
|
side=close_side,
|
|
order_type='market',
|
|
quantity=close_quantity
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error closing position for {symbol}: {e}")
|
|
return {'error': str(e)} |