improve training and model data
This commit is contained in:
@ -661,22 +661,315 @@ class MultiExchangeCOBProvider:
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Binance order book for {symbol}: {e}", exc_info=True)
|
||||
|
||||
async def _stream_coinbase_orderbook(self, symbol: str, config: ExchangeConfig):
|
||||
"""Stream Coinbase order book data (placeholder implementation)"""
|
||||
async def _process_coinbase_orderbook(self, symbol: str, data: Dict):
|
||||
"""Process Coinbase order book data"""
|
||||
try:
|
||||
# For now, just log that Coinbase streaming is not implemented
|
||||
logger.info(f"Coinbase streaming for {symbol} not yet implemented")
|
||||
await asyncio.sleep(60) # Sleep to prevent spam
|
||||
if data.get('type') == 'snapshot':
|
||||
# Initial snapshot
|
||||
bids = {}
|
||||
asks = {}
|
||||
|
||||
for bid_data in data.get('bids', []):
|
||||
price, size = float(bid_data[0]), float(bid_data[1])
|
||||
if size > 0:
|
||||
bids[price] = ExchangeOrderBookLevel(
|
||||
exchange='coinbase',
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1, # Coinbase doesn't provide order count
|
||||
side='bid',
|
||||
timestamp=datetime.now(),
|
||||
raw_data=bid_data
|
||||
)
|
||||
|
||||
for ask_data in data.get('asks', []):
|
||||
price, size = float(ask_data[0]), float(ask_data[1])
|
||||
if size > 0:
|
||||
asks[price] = ExchangeOrderBookLevel(
|
||||
exchange='coinbase',
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1,
|
||||
side='ask',
|
||||
timestamp=datetime.now(),
|
||||
raw_data=ask_data
|
||||
)
|
||||
|
||||
# Update order book
|
||||
async with self.data_lock:
|
||||
if symbol not in self.exchange_order_books:
|
||||
self.exchange_order_books[symbol] = {}
|
||||
|
||||
self.exchange_order_books[symbol]['coinbase'] = {
|
||||
'bids': bids,
|
||||
'asks': asks,
|
||||
'last_update': datetime.now(),
|
||||
'connected': True
|
||||
}
|
||||
|
||||
logger.info(f"Coinbase snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
||||
|
||||
elif data.get('type') == 'l2update':
|
||||
# Level 2 update
|
||||
async with self.data_lock:
|
||||
if symbol in self.exchange_order_books and 'coinbase' in self.exchange_order_books[symbol]:
|
||||
coinbase_data = self.exchange_order_books[symbol]['coinbase']
|
||||
|
||||
for change in data.get('changes', []):
|
||||
side, price_str, size_str = change
|
||||
price, size = float(price_str), float(size_str)
|
||||
|
||||
if side == 'buy':
|
||||
if size == 0:
|
||||
# Remove level
|
||||
coinbase_data['bids'].pop(price, None)
|
||||
else:
|
||||
# Update level
|
||||
coinbase_data['bids'][price] = ExchangeOrderBookLevel(
|
||||
exchange='coinbase',
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1,
|
||||
side='bid',
|
||||
timestamp=datetime.now(),
|
||||
raw_data=change
|
||||
)
|
||||
elif side == 'sell':
|
||||
if size == 0:
|
||||
# Remove level
|
||||
coinbase_data['asks'].pop(price, None)
|
||||
else:
|
||||
# Update level
|
||||
coinbase_data['asks'][price] = ExchangeOrderBookLevel(
|
||||
exchange='coinbase',
|
||||
price=price,
|
||||
size=size,
|
||||
volume_usd=price * size,
|
||||
orders_count=1,
|
||||
side='ask',
|
||||
timestamp=datetime.now(),
|
||||
raw_data=change
|
||||
)
|
||||
|
||||
coinbase_data['last_update'] = datetime.now()
|
||||
|
||||
# Update exchange count
|
||||
exchange_name = 'coinbase'
|
||||
if exchange_name not in self.exchange_update_counts:
|
||||
self.exchange_update_counts[exchange_name] = 0
|
||||
self.exchange_update_counts[exchange_name] += 1
|
||||
|
||||
# Log every 1000th update
|
||||
if self.exchange_update_counts[exchange_name] % 1000 == 0:
|
||||
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Coinbase updates for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error streaming Coinbase order book for {symbol}: {e}")
|
||||
logger.error(f"Error processing Coinbase order book for {symbol}: {e}", exc_info=True)
|
||||
|
||||
async def _process_kraken_orderbook(self, symbol: str, data: Dict):
|
||||
"""Process Kraken order book data"""
|
||||
try:
|
||||
# Kraken sends different message types
|
||||
if isinstance(data, list) and len(data) > 1:
|
||||
# Order book update format: [channel_id, data, channel_name, pair]
|
||||
if len(data) >= 4 and data[2] == "book-25":
|
||||
book_data = data[1]
|
||||
|
||||
# Check for snapshot vs update
|
||||
if 'bs' in book_data and 'as' in book_data:
|
||||
# Snapshot
|
||||
bids = {}
|
||||
asks = {}
|
||||
|
||||
for bid_data in book_data.get('bs', []):
|
||||
price, volume, timestamp = float(bid_data[0]), float(bid_data[1]), float(bid_data[2])
|
||||
if volume > 0:
|
||||
bids[price] = ExchangeOrderBookLevel(
|
||||
exchange='kraken',
|
||||
price=price,
|
||||
size=volume,
|
||||
volume_usd=price * volume,
|
||||
orders_count=1, # Kraken doesn't provide order count in book feed
|
||||
side='bid',
|
||||
timestamp=datetime.fromtimestamp(timestamp),
|
||||
raw_data=bid_data
|
||||
)
|
||||
|
||||
for ask_data in book_data.get('as', []):
|
||||
price, volume, timestamp = float(ask_data[0]), float(ask_data[1]), float(ask_data[2])
|
||||
if volume > 0:
|
||||
asks[price] = ExchangeOrderBookLevel(
|
||||
exchange='kraken',
|
||||
price=price,
|
||||
size=volume,
|
||||
volume_usd=price * volume,
|
||||
orders_count=1,
|
||||
side='ask',
|
||||
timestamp=datetime.fromtimestamp(timestamp),
|
||||
raw_data=ask_data
|
||||
)
|
||||
|
||||
# Update order book
|
||||
async with self.data_lock:
|
||||
if symbol not in self.exchange_order_books:
|
||||
self.exchange_order_books[symbol] = {}
|
||||
|
||||
self.exchange_order_books[symbol]['kraken'] = {
|
||||
'bids': bids,
|
||||
'asks': asks,
|
||||
'last_update': datetime.now(),
|
||||
'connected': True
|
||||
}
|
||||
|
||||
logger.info(f"Kraken snapshot for {symbol}: {len(bids)} bids, {len(asks)} asks")
|
||||
|
||||
else:
|
||||
# Incremental update
|
||||
async with self.data_lock:
|
||||
if symbol in self.exchange_order_books and 'kraken' in self.exchange_order_books[symbol]:
|
||||
kraken_data = self.exchange_order_books[symbol]['kraken']
|
||||
|
||||
# Process bid updates
|
||||
for bid_update in book_data.get('b', []):
|
||||
price, volume, timestamp = float(bid_update[0]), float(bid_update[1]), float(bid_update[2])
|
||||
if volume == 0:
|
||||
# Remove level
|
||||
kraken_data['bids'].pop(price, None)
|
||||
else:
|
||||
# Update level
|
||||
kraken_data['bids'][price] = ExchangeOrderBookLevel(
|
||||
exchange='kraken',
|
||||
price=price,
|
||||
size=volume,
|
||||
volume_usd=price * volume,
|
||||
orders_count=1,
|
||||
side='bid',
|
||||
timestamp=datetime.fromtimestamp(timestamp),
|
||||
raw_data=bid_update
|
||||
)
|
||||
|
||||
# Process ask updates
|
||||
for ask_update in book_data.get('a', []):
|
||||
price, volume, timestamp = float(ask_update[0]), float(ask_update[1]), float(ask_update[2])
|
||||
if volume == 0:
|
||||
# Remove level
|
||||
kraken_data['asks'].pop(price, None)
|
||||
else:
|
||||
# Update level
|
||||
kraken_data['asks'][price] = ExchangeOrderBookLevel(
|
||||
exchange='kraken',
|
||||
price=price,
|
||||
size=volume,
|
||||
volume_usd=price * volume,
|
||||
orders_count=1,
|
||||
side='ask',
|
||||
timestamp=datetime.fromtimestamp(timestamp),
|
||||
raw_data=ask_update
|
||||
)
|
||||
|
||||
kraken_data['last_update'] = datetime.now()
|
||||
|
||||
# Update exchange count
|
||||
exchange_name = 'kraken'
|
||||
if exchange_name not in self.exchange_update_counts:
|
||||
self.exchange_update_counts[exchange_name] = 0
|
||||
self.exchange_update_counts[exchange_name] += 1
|
||||
|
||||
# Log every 1000th update
|
||||
if self.exchange_update_counts[exchange_name] % 1000 == 0:
|
||||
logger.info(f"Processed {self.exchange_update_counts[exchange_name]} Kraken updates for {symbol}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Kraken order book for {symbol}: {e}", exc_info=True)
|
||||
|
||||
async def _stream_coinbase_orderbook(self, symbol: str, config: ExchangeConfig):
|
||||
"""Stream Coinbase order book data via WebSocket"""
|
||||
try:
|
||||
import json
|
||||
if websockets is None or websockets_connect is None:
|
||||
raise ImportError("websockets module not available")
|
||||
|
||||
# Coinbase Pro WebSocket URL
|
||||
ws_url = "wss://ws-feed.pro.coinbase.com"
|
||||
coinbase_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', '-'))
|
||||
|
||||
# Subscribe message for level2 order book updates
|
||||
subscribe_message = {
|
||||
"type": "subscribe",
|
||||
"product_ids": [coinbase_symbol],
|
||||
"channels": ["level2"]
|
||||
}
|
||||
|
||||
logger.info(f"Connecting to Coinbase order book stream for {symbol}")
|
||||
|
||||
async with websockets_connect(ws_url) as websocket:
|
||||
# Send subscription
|
||||
await websocket.send(json.dumps(subscribe_message))
|
||||
logger.info(f"Subscribed to Coinbase level2 for {coinbase_symbol}")
|
||||
|
||||
async for message in websocket:
|
||||
if not self.is_streaming:
|
||||
break
|
||||
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_coinbase_orderbook(symbol, data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing Coinbase message: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Coinbase orderbook: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Coinbase order book stream error for {symbol}: {e}")
|
||||
finally:
|
||||
logger.info(f"Disconnected from Coinbase order book stream for {symbol}")
|
||||
|
||||
async def _stream_kraken_orderbook(self, symbol: str, config: ExchangeConfig):
|
||||
"""Stream Kraken order book data (placeholder implementation)"""
|
||||
"""Stream Kraken order book data via WebSocket"""
|
||||
try:
|
||||
logger.info(f"Kraken streaming for {symbol} not yet implemented")
|
||||
await asyncio.sleep(60) # Sleep to prevent spam
|
||||
import json
|
||||
if websockets is None or websockets_connect is None:
|
||||
raise ImportError("websockets module not available")
|
||||
|
||||
# Kraken WebSocket URL
|
||||
ws_url = "wss://ws.kraken.com"
|
||||
kraken_symbol = config.symbols_mapping.get(symbol, symbol.replace('/', ''))
|
||||
|
||||
# Subscribe message for book updates
|
||||
subscribe_message = {
|
||||
"event": "subscribe",
|
||||
"pair": [kraken_symbol],
|
||||
"subscription": {"name": "book", "depth": 25}
|
||||
}
|
||||
|
||||
logger.info(f"Connecting to Kraken order book stream for {symbol}")
|
||||
|
||||
async with websockets_connect(ws_url) as websocket:
|
||||
# Send subscription
|
||||
await websocket.send(json.dumps(subscribe_message))
|
||||
logger.info(f"Subscribed to Kraken book for {kraken_symbol}")
|
||||
|
||||
async for message in websocket:
|
||||
if not self.is_streaming:
|
||||
break
|
||||
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_kraken_orderbook(symbol, data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Error parsing Kraken message: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Kraken orderbook: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error streaming Kraken order book for {symbol}: {e}")
|
||||
logger.error(f"Kraken order book stream error for {symbol}: {e}")
|
||||
finally:
|
||||
logger.info(f"Disconnected from Kraken order book stream for {symbol}")
|
||||
|
||||
async def _stream_huobi_orderbook(self, symbol: str, config: ExchangeConfig):
|
||||
"""Stream Huobi order book data (placeholder implementation)"""
|
||||
|
Reference in New Issue
Block a user