14. finishing connectors
This commit is contained in:
@ -9,7 +9,7 @@ import json
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, List, Optional, Callable, Any
|
||||
from datetime import datetime, timedelta
|
||||
from enmodels.core import ConnectionStatus, OrderBookSnapshot, TradeEvent
|
||||
from ..models.core import ConnectionStatus, OrderBookSnapshot, TradeEvent
|
||||
from ..utils.logging import get_logger, set_correlation_id
|
||||
from ..utils.exceptions import ConnectionError, ValidationError
|
||||
from ..utils.timing import get_current_timestamp
|
||||
|
@ -43,7 +43,8 @@ class BitfinexConnector(BaseExchangeConnector):
|
||||
'subscribed': self._handle_subscription_response,
|
||||
'unsubscribed': self._handle_unsubscription_response,
|
||||
'error': self._handle_error_message,
|
||||
'info': self._handle_info_message
|
||||
'info': self._handle_info_message,
|
||||
'data': self._handle_data_message
|
||||
})
|
||||
|
||||
# Channel management
|
||||
@ -150,13 +151,75 @@ class BitfinexConnector(BaseExchangeConnector):
|
||||
|
||||
async def unsubscribe_orderbook(self, symbol: str) -> None:
|
||||
"""Unsubscribe from order book updates."""
|
||||
# Implementation would find the channel ID and send unsubscribe message
|
||||
pass
|
||||
try:
|
||||
bitfinex_symbol = self.normalize_symbol(symbol)
|
||||
|
||||
# Find channel ID for this symbol's order book
|
||||
channel_id = None
|
||||
for cid, info in self.channels.items():
|
||||
if info.get('channel') == 'book' and info.get('symbol') == bitfinex_symbol:
|
||||
channel_id = cid
|
||||
break
|
||||
|
||||
if channel_id:
|
||||
unsubscription_msg = {
|
||||
"event": "unsubscribe",
|
||||
"chanId": channel_id
|
||||
}
|
||||
|
||||
success = await self._send_message(unsubscription_msg)
|
||||
if success:
|
||||
if symbol in self.subscriptions and 'orderbook' in self.subscriptions[symbol]:
|
||||
self.subscriptions[symbol].remove('orderbook')
|
||||
if not self.subscriptions[symbol]:
|
||||
del self.subscriptions[symbol]
|
||||
|
||||
self.subscribed_symbols.discard(bitfinex_symbol)
|
||||
logger.info(f"Unsubscribed from order book for {symbol} on Bitfinex")
|
||||
else:
|
||||
logger.error(f"Failed to unsubscribe from order book for {symbol} on Bitfinex")
|
||||
else:
|
||||
logger.warning(f"No active order book subscription found for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error unsubscribing from order book for {symbol}: {e}")
|
||||
raise
|
||||
|
||||
async def unsubscribe_trades(self, symbol: str) -> None:
|
||||
"""Unsubscribe from trade updates."""
|
||||
# Implementation would find the channel ID and send unsubscribe message
|
||||
pass
|
||||
try:
|
||||
bitfinex_symbol = self.normalize_symbol(symbol)
|
||||
|
||||
# Find channel ID for this symbol's trades
|
||||
channel_id = None
|
||||
for cid, info in self.channels.items():
|
||||
if info.get('channel') == 'trades' and info.get('symbol') == bitfinex_symbol:
|
||||
channel_id = cid
|
||||
break
|
||||
|
||||
if channel_id:
|
||||
unsubscription_msg = {
|
||||
"event": "unsubscribe",
|
||||
"chanId": channel_id
|
||||
}
|
||||
|
||||
success = await self._send_message(unsubscription_msg)
|
||||
if success:
|
||||
if symbol in self.subscriptions and 'trades' in self.subscriptions[symbol]:
|
||||
self.subscriptions[symbol].remove('trades')
|
||||
if not self.subscriptions[symbol]:
|
||||
del self.subscriptions[symbol]
|
||||
|
||||
self.subscribed_symbols.discard(bitfinex_symbol)
|
||||
logger.info(f"Unsubscribed from trades for {symbol} on Bitfinex")
|
||||
else:
|
||||
logger.error(f"Failed to unsubscribe from trades for {symbol} on Bitfinex")
|
||||
else:
|
||||
logger.warning(f"No active trades subscription found for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error unsubscribing from trades for {symbol}: {e}")
|
||||
raise
|
||||
|
||||
async def get_symbols(self) -> List[str]:
|
||||
"""Get available symbols from Bitfinex."""
|
||||
@ -256,6 +319,127 @@ class BitfinexConnector(BaseExchangeConnector):
|
||||
"""Handle info message."""
|
||||
logger.info(f"Bitfinex info: {data}")
|
||||
|
||||
async def _handle_data_message(self, data: List) -> None:
|
||||
"""Handle data message from Bitfinex."""
|
||||
try:
|
||||
if len(data) < 2:
|
||||
return
|
||||
|
||||
channel_id = data[0]
|
||||
message_data = data[1]
|
||||
|
||||
if channel_id not in self.channels:
|
||||
logger.warning(f"Received data for unknown channel: {channel_id}")
|
||||
return
|
||||
|
||||
channel_info = self.channels[channel_id]
|
||||
channel_type = channel_info.get('channel')
|
||||
symbol = channel_info.get('symbol', '')
|
||||
|
||||
if channel_type == 'book':
|
||||
await self._handle_orderbook_data(message_data, symbol)
|
||||
elif channel_type == 'trades':
|
||||
await self._handle_trades_data(message_data, symbol)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling data message: {e}")
|
||||
|
||||
async def _handle_orderbook_data(self, data, symbol: str) -> None:
|
||||
"""Handle order book data from Bitfinex."""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
if not isinstance(data, list):
|
||||
return
|
||||
|
||||
standard_symbol = self._denormalize_symbol(symbol)
|
||||
|
||||
# Handle snapshot vs update
|
||||
if len(data) > 0 and isinstance(data[0], list):
|
||||
# Snapshot - array of [price, count, amount]
|
||||
bids = []
|
||||
asks = []
|
||||
|
||||
for level in data:
|
||||
if len(level) >= 3:
|
||||
price = float(level[0])
|
||||
count = int(level[1])
|
||||
amount = float(level[2])
|
||||
|
||||
if validate_price(price) and validate_volume(abs(amount)):
|
||||
if amount > 0:
|
||||
bids.append(PriceLevel(price=price, size=amount))
|
||||
else:
|
||||
asks.append(PriceLevel(price=price, size=abs(amount)))
|
||||
|
||||
orderbook = OrderBookSnapshot(
|
||||
symbol=standard_symbol,
|
||||
exchange=self.exchange_name,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
bids=bids,
|
||||
asks=asks
|
||||
)
|
||||
|
||||
self._notify_data_callbacks(orderbook)
|
||||
logger.debug(f"Processed order book snapshot for {standard_symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling order book data: {e}")
|
||||
|
||||
async def _handle_trades_data(self, data, symbol: str) -> None:
|
||||
"""Handle trades data from Bitfinex."""
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
if not isinstance(data, list):
|
||||
return
|
||||
|
||||
standard_symbol = self._denormalize_symbol(symbol)
|
||||
|
||||
# Handle snapshot vs update
|
||||
if len(data) > 0 and isinstance(data[0], list):
|
||||
# Snapshot - array of trades
|
||||
for trade_data in data:
|
||||
await self._process_single_trade(trade_data, standard_symbol)
|
||||
elif len(data) >= 4:
|
||||
# Single trade update
|
||||
await self._process_single_trade(data, standard_symbol)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling trades data: {e}")
|
||||
|
||||
async def _process_single_trade(self, trade_data: List, symbol: str) -> None:
|
||||
"""Process a single trade from Bitfinex."""
|
||||
try:
|
||||
if len(trade_data) < 4:
|
||||
return
|
||||
|
||||
trade_id = str(trade_data[0])
|
||||
timestamp = int(trade_data[1]) / 1000 # Convert to seconds
|
||||
amount = float(trade_data[2])
|
||||
price = float(trade_data[3])
|
||||
|
||||
if not validate_price(price) or not validate_volume(abs(amount)):
|
||||
return
|
||||
|
||||
side = 'buy' if amount > 0 else 'sell'
|
||||
|
||||
trade = TradeEvent(
|
||||
symbol=symbol,
|
||||
exchange=self.exchange_name,
|
||||
timestamp=datetime.fromtimestamp(timestamp, tz=timezone.utc),
|
||||
price=price,
|
||||
size=abs(amount),
|
||||
side=side,
|
||||
trade_id=trade_id
|
||||
)
|
||||
|
||||
self._notify_data_callbacks(trade)
|
||||
logger.debug(f"Processed trade for {symbol}: {side} {abs(amount)} @ {price}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing single trade: {e}")
|
||||
|
||||
def get_bitfinex_stats(self) -> Dict[str, Any]:
|
||||
"""Get Bitfinex-specific statistics."""
|
||||
base_stats = self.get_stats()
|
||||
|
@ -251,23 +251,161 @@ class MEXCConnector(BaseExchangeConnector):
|
||||
|
||||
async def _handle_orderbook_update(self, data: Dict) -> None:
|
||||
"""Handle order book update from MEXC."""
|
||||
# Implementation would parse MEXC-specific order book update format
|
||||
logger.debug("Received MEXC order book update")
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
symbol_data = data.get('s', '') # Symbol
|
||||
if not symbol_data:
|
||||
logger.warning("Order book update missing symbol")
|
||||
return
|
||||
|
||||
symbol = symbol_data # Already in standard format
|
||||
order_data = data.get('d', {})
|
||||
|
||||
# Parse bids and asks
|
||||
bids = []
|
||||
for bid_data in order_data.get('bids', []):
|
||||
price = float(bid_data[0])
|
||||
size = float(bid_data[1])
|
||||
|
||||
if validate_price(price) and validate_volume(size):
|
||||
bids.append(PriceLevel(price=price, size=size))
|
||||
|
||||
asks = []
|
||||
for ask_data in order_data.get('asks', []):
|
||||
price = float(ask_data[0])
|
||||
size = float(ask_data[1])
|
||||
|
||||
if validate_price(price) and validate_volume(size):
|
||||
asks.append(PriceLevel(price=price, size=size))
|
||||
|
||||
# Create order book snapshot
|
||||
orderbook = OrderBookSnapshot(
|
||||
symbol=symbol,
|
||||
exchange=self.exchange_name,
|
||||
timestamp=datetime.fromtimestamp(int(data.get('t', 0)) / 1000, tz=timezone.utc),
|
||||
bids=bids,
|
||||
asks=asks,
|
||||
sequence_id=order_data.get('lastUpdateId')
|
||||
)
|
||||
|
||||
# Notify callbacks
|
||||
self._notify_data_callbacks(orderbook)
|
||||
|
||||
logger.debug(f"Processed order book update for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling order book update: {e}")
|
||||
|
||||
async def _handle_orderbook_snapshot(self, data: Dict) -> None:
|
||||
"""Handle order book snapshot from MEXC."""
|
||||
# Implementation would parse MEXC-specific order book snapshot format
|
||||
logger.debug("Received MEXC order book snapshot")
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
symbol_data = data.get('s', '') # Symbol
|
||||
if not symbol_data:
|
||||
logger.warning("Order book snapshot missing symbol")
|
||||
return
|
||||
|
||||
symbol = symbol_data # Already in standard format
|
||||
order_data = data.get('d', {})
|
||||
|
||||
# Parse bids and asks
|
||||
bids = []
|
||||
for bid_data in order_data.get('bids', []):
|
||||
price = float(bid_data[0])
|
||||
size = float(bid_data[1])
|
||||
|
||||
if validate_price(price) and validate_volume(size):
|
||||
bids.append(PriceLevel(price=price, size=size))
|
||||
|
||||
asks = []
|
||||
for ask_data in order_data.get('asks', []):
|
||||
price = float(ask_data[0])
|
||||
size = float(ask_data[1])
|
||||
|
||||
if validate_price(price) and validate_volume(size):
|
||||
asks.append(PriceLevel(price=price, size=size))
|
||||
|
||||
# Create order book snapshot
|
||||
orderbook = OrderBookSnapshot(
|
||||
symbol=symbol,
|
||||
exchange=self.exchange_name,
|
||||
timestamp=datetime.fromtimestamp(int(data.get('t', 0)) / 1000, tz=timezone.utc),
|
||||
bids=bids,
|
||||
asks=asks,
|
||||
sequence_id=order_data.get('lastUpdateId')
|
||||
)
|
||||
|
||||
# Notify callbacks
|
||||
self._notify_data_callbacks(orderbook)
|
||||
|
||||
logger.debug(f"Processed order book snapshot for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling order book snapshot: {e}")
|
||||
|
||||
async def _handle_trade_update(self, data: Dict) -> None:
|
||||
"""Handle trade update from MEXC."""
|
||||
# Implementation would parse MEXC-specific trade format
|
||||
logger.debug("Received MEXC trade update")
|
||||
try:
|
||||
set_correlation_id()
|
||||
|
||||
symbol_data = data.get('s', '') # Symbol
|
||||
if not symbol_data:
|
||||
logger.warning("Trade update missing symbol")
|
||||
return
|
||||
|
||||
symbol = symbol_data # Already in standard format
|
||||
trade_data = data.get('d', {})
|
||||
|
||||
# MEXC trade data format
|
||||
trades = trade_data.get('deals', [])
|
||||
|
||||
for trade_info in trades:
|
||||
price = float(trade_info.get('p', 0))
|
||||
quantity = float(trade_info.get('v', 0))
|
||||
|
||||
# Validate data
|
||||
if not validate_price(price) or not validate_volume(quantity):
|
||||
logger.warning(f"Invalid trade data: price={price}, quantity={quantity}")
|
||||
continue
|
||||
|
||||
# Determine side (MEXC uses 'S' field: 1=buy, 2=sell)
|
||||
side_code = trade_info.get('S', 0)
|
||||
side = 'buy' if side_code == 1 else 'sell'
|
||||
|
||||
# Create trade event
|
||||
trade = TradeEvent(
|
||||
symbol=symbol,
|
||||
exchange=self.exchange_name,
|
||||
timestamp=datetime.fromtimestamp(int(trade_info.get('t', 0)) / 1000, tz=timezone.utc),
|
||||
price=price,
|
||||
size=quantity,
|
||||
side=side,
|
||||
trade_id=str(trade_info.get('i', ''))
|
||||
)
|
||||
|
||||
# Notify callbacks
|
||||
self._notify_data_callbacks(trade)
|
||||
|
||||
logger.debug(f"Processed trade for {symbol}: {side} {quantity} @ {price}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling trade update: {e}")
|
||||
|
||||
async def _handle_pong(self, data: Dict) -> None:
|
||||
"""Handle pong response from MEXC."""
|
||||
logger.debug("Received MEXC pong")
|
||||
|
||||
async def _send_ping(self) -> None:
|
||||
"""Send ping to keep connection alive."""
|
||||
try:
|
||||
ping_msg = {"method": "PING"}
|
||||
await self._send_message(ping_msg)
|
||||
logger.debug("Sent ping to MEXC")
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending ping: {e}")
|
||||
|
||||
def get_mexc_stats(self) -> Dict[str, Any]:
|
||||
"""Get MEXC-specific statistics."""
|
||||
base_stats = self.get_stats()
|
||||
|
Reference in New Issue
Block a user