#!/usr/bin/env python3 """ Example: Using Unified Storage System with DataProvider This example demonstrates how to use the unified storage system for both real-time and historical data access. """ import asyncio import sys from pathlib import Path from datetime import datetime, timedelta # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from core.data_provider import DataProvider async def example_realtime_data(): """Example: Get real-time data from cache""" print("\n" + "="*60) print("EXAMPLE 1: Real-Time Data (from cache)") print("="*60) data_provider = DataProvider() # Enable unified storage print("\n1. Enabling unified storage...") success = await data_provider.enable_unified_storage() if not success: print("āŒ Failed to enable unified storage") return print("āœ… Unified storage enabled") # Get latest real-time data print("\n2. Getting latest real-time data...") inference_data = await data_provider.get_inference_data_unified('ETH/USDT') print(f"\nšŸ“Š Inference Data:") print(f" Symbol: {inference_data.symbol}") print(f" Timestamp: {inference_data.timestamp}") print(f" Data Source: {inference_data.data_source}") print(f" Query Latency: {inference_data.query_latency_ms:.2f}ms") # Check data completeness print(f"\nāœ“ Complete Data: {inference_data.has_complete_data()}") # Get data summary summary = inference_data.get_data_summary() print(f"\nšŸ“ˆ Data Summary:") print(f" OHLCV 1s rows: {summary['ohlcv_1s_rows']}") print(f" OHLCV 1m rows: {summary['ohlcv_1m_rows']}") print(f" OHLCV 1h rows: {summary['ohlcv_1h_rows']}") print(f" Has orderbook: {summary['has_orderbook']}") print(f" Imbalances rows: {summary['imbalances_rows']}") # Get latest price latest_price = inference_data.get_latest_price() if latest_price: print(f"\nšŸ’° Latest Price: ${latest_price:.2f}") # Get technical indicators if inference_data.indicators: print(f"\nšŸ“‰ Technical Indicators:") for indicator, value in inference_data.indicators.items(): print(f" {indicator}: {value:.4f}") # Cleanup await data_provider.disable_unified_storage() async def example_historical_data(): """Example: Get historical data from database""" print("\n" + "="*60) print("EXAMPLE 2: Historical Data (from database)") print("="*60) data_provider = DataProvider() # Enable unified storage print("\n1. Enabling unified storage...") await data_provider.enable_unified_storage() # Get historical data from 1 hour ago target_time = datetime.now() - timedelta(hours=1) print(f"\n2. Getting historical data at {target_time}...") inference_data = await data_provider.get_inference_data_unified( symbol='ETH/USDT', timestamp=target_time, context_window_minutes=5 ) print(f"\nšŸ“Š Inference Data:") print(f" Symbol: {inference_data.symbol}") print(f" Timestamp: {inference_data.timestamp}") print(f" Data Source: {inference_data.data_source}") print(f" Query Latency: {inference_data.query_latency_ms:.2f}ms") # Show multi-timeframe data print(f"\nšŸ“ˆ Multi-Timeframe Data:") for tf in ['1s', '1m', '5m', '15m', '1h', '1d']: df = inference_data.get_timeframe_data(tf) print(f" {tf}: {len(df)} candles") # Show context data if inference_data.context_data is not None: print(f"\nšŸ” Context Data: {len(inference_data.context_data)} rows") # Cleanup await data_provider.disable_unified_storage() async def example_multi_timeframe(): """Example: Get multi-timeframe data""" print("\n" + "="*60) print("EXAMPLE 3: Multi-Timeframe Data") print("="*60) data_provider = DataProvider() # Enable unified storage print("\n1. Enabling unified storage...") await data_provider.enable_unified_storage() # Get multiple timeframes print("\n2. Getting multi-timeframe data...") multi_tf = await data_provider.get_multi_timeframe_data_unified( symbol='ETH/USDT', timeframes=['1m', '5m', '1h'], limit=100 ) print(f"\nšŸ“Š Multi-Timeframe Data:") for timeframe, df in multi_tf.items(): print(f"\n {timeframe}:") print(f" Rows: {len(df)}") if not df.empty: latest = df.iloc[-1] print(f" Latest close: ${latest['close_price']:.2f}") print(f" Latest volume: {latest['volume']:.2f}") # Cleanup await data_provider.disable_unified_storage() async def example_orderbook(): """Example: Get order book data""" print("\n" + "="*60) print("EXAMPLE 4: Order Book Data") print("="*60) data_provider = DataProvider() # Enable unified storage print("\n1. Enabling unified storage...") await data_provider.enable_unified_storage() # Get order book print("\n2. Getting order book data...") orderbook = await data_provider.get_order_book_data_unified('ETH/USDT') print(f"\nšŸ“Š Order Book:") print(f" Symbol: {orderbook.symbol}") print(f" Timestamp: {orderbook.timestamp}") print(f" Mid Price: ${orderbook.mid_price:.2f}") print(f" Spread: ${orderbook.spread:.4f}") print(f" Spread (bps): {orderbook.get_spread_bps():.2f}") # Show best bid/ask best_bid = orderbook.get_best_bid() best_ask = orderbook.get_best_ask() if best_bid: print(f"\n Best Bid: ${best_bid[0]:.2f} (size: {best_bid[1]:.4f})") if best_ask: print(f" Best Ask: ${best_ask[0]:.2f} (size: {best_ask[1]:.4f})") # Show imbalances imbalances = orderbook.get_imbalance_summary() print(f"\nšŸ“‰ Imbalances:") for key, value in imbalances.items(): print(f" {key}: {value:.4f}") # Cleanup await data_provider.disable_unified_storage() async def example_statistics(): """Example: Get unified storage statistics""" print("\n" + "="*60) print("EXAMPLE 5: Unified Storage Statistics") print("="*60) data_provider = DataProvider() # Enable unified storage print("\n1. Enabling unified storage...") await data_provider.enable_unified_storage() # Get some data to generate stats print("\n2. Generating some activity...") await data_provider.get_inference_data_unified('ETH/USDT') await data_provider.get_inference_data_unified('BTC/USDT') # Get statistics print("\n3. Getting statistics...") stats = data_provider.get_unified_storage_stats() if stats.get('cache'): print(f"\nšŸ“Š Cache Statistics:") cache_stats = stats['cache'] print(f" Hit Rate: {cache_stats.get('hit_rate_percent', 0):.2f}%") print(f" Total Entries: {cache_stats.get('total_entries', 0)}") print(f" Cache Hits: {cache_stats.get('cache_hits', 0)}") print(f" Cache Misses: {cache_stats.get('cache_misses', 0)}") if stats.get('database'): print(f"\nšŸ’¾ Database Statistics:") db_stats = stats['database'] print(f" Total Queries: {db_stats.get('total_queries', 0)}") print(f" Failed Queries: {db_stats.get('failed_queries', 0)}") print(f" Avg Query Time: {db_stats.get('avg_query_time_ms', 0):.2f}ms") print(f" Success Rate: {db_stats.get('success_rate', 0):.2f}%") if stats.get('ingestion'): print(f"\nšŸ“„ Ingestion Statistics:") ing_stats = stats['ingestion'] print(f" Total Ingested: {ing_stats.get('total_ingested', 0)}") print(f" OHLCV Ingested: {ing_stats.get('ohlcv_ingested', 0)}") print(f" Validation Failures: {ing_stats.get('validation_failures', 0)}") print(f" DB Writes: {ing_stats.get('db_writes', 0)}") # Cleanup await data_provider.disable_unified_storage() async def main(): """Run all examples""" print("\n" + "="*60) print("UNIFIED STORAGE SYSTEM EXAMPLES") print("="*60) try: # Run examples await example_realtime_data() await asyncio.sleep(1) await example_historical_data() await asyncio.sleep(1) await example_multi_timeframe() await asyncio.sleep(1) await example_orderbook() await asyncio.sleep(1) await example_statistics() print("\n" + "="*60) print("āœ… All examples completed successfully!") print("="*60 + "\n") except Exception as e: print(f"\nāŒ Error running examples: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())