Files
gogo2/test_cob_data_format.py
Dobromir Popov fab25ffe6f wip....
2025-06-27 03:48:48 +03:00

69 lines
2.3 KiB
Python

#!/usr/bin/env python3
"""
Test COB Data Format - Check what data is actually available
"""
import time
import asyncio
from core.multi_exchange_cob_provider import MultiExchangeCOBProvider
async def test_cob_data_format():
"""Test what COB data format is actually available"""
print("=== COB DATA FORMAT TEST ===")
# Create COB provider directly (same as dashboard)
cob_provider = MultiExchangeCOBProvider(
symbols=['ETH/USDT', 'BTC/USDT'],
bucket_size_bps=1.0
)
# Add callback to capture data
captured_data = {}
def capture_callback(symbol: str, cob_snapshot):
captured_data[symbol] = cob_snapshot
print(f"Captured COB data for {symbol}:")
print(f" Type: {type(cob_snapshot)}")
print(f" Attributes: {dir(cob_snapshot)}")
# Check key attributes
if hasattr(cob_snapshot, 'consolidated_bids'):
print(f" Bids count: {len(cob_snapshot.consolidated_bids)}")
if hasattr(cob_snapshot, 'consolidated_asks'):
print(f" Asks count: {len(cob_snapshot.consolidated_asks)}")
if hasattr(cob_snapshot, 'spread_bps'):
print(f" Spread: {cob_snapshot.spread_bps}")
if hasattr(cob_snapshot, 'exchanges_active'):
print(f" Active exchanges: {len(cob_snapshot.exchanges_active)}")
print()
cob_provider.subscribe_to_cob_updates(capture_callback)
# Start COB provider
print("Starting COB provider...")
await cob_provider.start_streaming()
# Wait for data
print("Waiting for COB data...")
for i in range(30):
await asyncio.sleep(1)
if captured_data:
break
if i % 5 == 0:
print(f" Waiting... {i}s")
if captured_data:
print("SUCCESS: COB data captured!")
for symbol, cob_snapshot in captured_data.items():
print(f"\n{symbol} COB snapshot:")
print(f" Type: {type(cob_snapshot)}")
print(f" Has consolidated_bids: {hasattr(cob_snapshot, 'consolidated_bids')}")
print(f" Has consolidated_asks: {hasattr(cob_snapshot, 'consolidated_asks')}")
else:
print("No COB data captured")
# Stop COB provider
await cob_provider.stop_streaming()
if __name__ == "__main__":
asyncio.run(test_cob_data_format())