Files
gogo2/test_free_orderbook_integration.py
Dobromir Popov 3cadae60f7 COB data and dash
2025-06-18 16:23:47 +03:00

187 lines
15 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Test Free Order Book Integration with Session Volume Profile
This script demonstrates how to use free Binance data to create
Bookmap-style functionality including:
- Real-time order book monitoring
- Session Volume Profile (SVP) calculation
- Order flow detection
- Market profile analysis
- CNN/DQN model integration
No paid APIs required!
"""
import asyncio
import logging
import time
import numpy as np
from datetime import datetime, timedelta
from core.bookmap_integration import BookmapIntegration
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('orderbook_test.log')
]
)
logger = logging.getLogger(__name__)
class OrderBookIntegrationDemo:
"""Demo class for free order book integration"""
def __init__(self):
self.symbols = ['ETHUSDT', 'BTCUSDT']
self.integration = BookmapIntegration(symbols=self.symbols)
self.running = False
# Add model callbacks
self.integration.add_cnn_callback(self.cnn_feature_callback)
self.integration.add_dqn_callback(self.dqn_feature_callback)
logger.info("Free Order Book Integration Demo initialized")
logger.info(f"Monitoring symbols: {self.symbols}")
def cnn_feature_callback(self, symbol: str, data: dict):
"""Callback for CNN model features"""
features = data.get('features')
if features is not None:
logger.info(f"CNN Features for {symbol}: {len(features)} dimensions")
logger.info(f" Order book features: {features[:10].round(4)}")
def dqn_feature_callback(self, symbol: str, data: dict):
"""Callback for DQN model features"""
state = data.get('state')
if state is not None:
logger.info(f"DQN State for {symbol}: {len(state)} dimensions")
logger.info(f" State features: {state[:5].round(4)}")
async def run_demo(self, duration_minutes: int = 5):
"""Run the demo for specified duration"""
logger.info(f"Starting {duration_minutes}-minute demo")
logger.info("=" * 60)
try:
# Start streaming
await self.integration.start_streaming()
self.running = True
# Wait for initial data
logger.info("Waiting for initial order book data...")
await asyncio.sleep(5)
end_time = datetime.now() + timedelta(minutes=duration_minutes)
while datetime.now() < end_time and self.running:
# Display current status
await self.display_status()
# Wait before next update
await asyncio.sleep(30) # Update every 30 seconds
except KeyboardInterrupt:
logger.info("Demo interrupted by user")
except Exception as e:
logger.error(f"Demo error: {e}")
finally:
await self.cleanup()
async def display_status(self):
"""Display current order book and SVP status"""
logger.info("\n" + "=" * 60)
logger.info("CURRENT STATUS")
logger.info("=" * 60)
for symbol in self.symbols:
logger.info(f"\n📊 {symbol} Status:")
# Basic order book info
dashboard_data = self.integration.get_dashboard_data(symbol)
if dashboard_data:
logger.info(f" Current Price: ${dashboard_data['mid_price']:.2f}")
logger.info(f" Spread: ${dashboard_data['spread']:.4f}")
logger.info(f" Bid Levels: {len(dashboard_data['bids'])}")
logger.info(f" Ask Levels: {len(dashboard_data['asks'])}")
# Recent signals
recent_signals = dashboard_data['recent_signals']
if recent_signals:
logger.info(f" Recent Signals: {len(recent_signals)}")
for signal in recent_signals[-3:]:
logger.info(f" {signal['type']}: ${signal['volume']:.0f} @ ${signal['price']:.2f} (confidence: {signal['confidence']:.2f})")
# Session Volume Profile
session_stats = self.integration.get_session_statistics(symbol)
if session_stats:
logger.info(f" 📈 Session Statistics:")
logger.info(f" VWAP: ${session_stats['session_vwap']:.2f}")
logger.info(f" POC: ${session_stats['poc_price']:.2f}")
logger.info(f" Volume: ${session_stats['total_volume']:.0f}")
logger.info(f" Trades: {session_stats['total_trades']}")
logger.info(f" Buy/Sell Ratio: {session_stats['buy_sell_ratio']:.2f}")
logger.info(f" Session Duration: {session_stats['session_duration_hours']:.1f}h")
# Market Profile Analysis
market_analysis = self.integration.get_market_profile_analysis(symbol)
if market_analysis:
structure = market_analysis['market_structure']
key_levels = market_analysis['key_levels']
logger.info(f" 🏗️ Market Structure:")
logger.info(f" Price vs VWAP: {structure['price_vs_vwap']} ({structure['distance_from_vwap_bps']} bps)")
logger.info(f" Price vs POC: {structure['price_vs_poc']} ({structure['distance_from_poc_bps']} bps)")
logger.info(f" In Value Area: {structure['in_value_area']}")
if key_levels['nearest_resistance']:
logger.info(f" Nearest Resistance: ${key_levels['nearest_resistance']:.2f}")
if key_levels['nearest_support']:
logger.info(f" Nearest Support: ${key_levels['nearest_support']:.2f}")
async def cleanup(self):
"""Clean up resources"""
logger.info("\n🧹 Cleaning up...")
self.running = False
await self.integration.stop_streaming()
logger.info("Demo completed successfully!")
async def main():
"""Main demo function"""
print("""
🔥 FREE ORDER BOOK INTEGRATION DEMO 🔥
This demo shows how to get Bookmap-style functionality using FREE data sources:
✅ Real-time order book depth (20 levels) from Binance WebSocket
✅ Session Volume Profile (SVP) calculated from trade data
✅ Order flow detection (sweeps, absorptions, momentum)
✅ Market profile analysis with POC and Value Area
✅ Real-time heatmap generation
✅ CNN and DQN model feature integration
No paid APIs required! 🎉
""")
demo = OrderBookIntegrationDemo()
try:
# Run main demo
await demo.run_demo(duration_minutes=5)
logger.info("\n✅ Demo completed successfully!")
logger.info("Check 'orderbook_test.log' for detailed logs")
except Exception as e:
logger.error(f"Demo failed: {e}")
import traceback
logger.error(traceback.format_exc())
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n👋 Demo stopped by user")
except Exception as e:
print(f"❌ Demo error: {e}")