Files
gogo2/examples/unified_storage_example.py
Dobromir Popov 68b91f37bd better pivots
2025-10-21 11:45:57 +03:00

275 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
Example: Using Unified Storage System with DataProvider
This example demonstrates how to use the unified storage system
for both real-time and historical data access.
"""
import asyncio
import sys
from pathlib import Path
from datetime import datetime, timedelta
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.data_provider import DataProvider
async def example_realtime_data():
"""Example: Get real-time data from cache"""
print("\n" + "="*60)
print("EXAMPLE 1: Real-Time Data (from cache)")
print("="*60)
data_provider = DataProvider()
# Enable unified storage
print("\n1. Enabling unified storage...")
success = await data_provider.enable_unified_storage()
if not success:
print("❌ Failed to enable unified storage")
return
print("✅ Unified storage enabled")
# Get latest real-time data
print("\n2. Getting latest real-time data...")
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
print(f"\n📊 Inference Data:")
print(f" Symbol: {inference_data.symbol}")
print(f" Timestamp: {inference_data.timestamp}")
print(f" Data Source: {inference_data.data_source}")
print(f" Query Latency: {inference_data.query_latency_ms:.2f}ms")
# Check data completeness
print(f"\n✓ Complete Data: {inference_data.has_complete_data()}")
# Get data summary
summary = inference_data.get_data_summary()
print(f"\n📈 Data Summary:")
print(f" OHLCV 1s rows: {summary['ohlcv_1s_rows']}")
print(f" OHLCV 1m rows: {summary['ohlcv_1m_rows']}")
print(f" OHLCV 1h rows: {summary['ohlcv_1h_rows']}")
print(f" Has orderbook: {summary['has_orderbook']}")
print(f" Imbalances rows: {summary['imbalances_rows']}")
# Get latest price
latest_price = inference_data.get_latest_price()
if latest_price:
print(f"\n💰 Latest Price: ${latest_price:.2f}")
# Get technical indicators
if inference_data.indicators:
print(f"\n📉 Technical Indicators:")
for indicator, value in inference_data.indicators.items():
print(f" {indicator}: {value:.4f}")
# Cleanup
await data_provider.disable_unified_storage()
async def example_historical_data():
"""Example: Get historical data from database"""
print("\n" + "="*60)
print("EXAMPLE 2: Historical Data (from database)")
print("="*60)
data_provider = DataProvider()
# Enable unified storage
print("\n1. Enabling unified storage...")
await data_provider.enable_unified_storage()
# Get historical data from 1 hour ago
target_time = datetime.now() - timedelta(hours=1)
print(f"\n2. Getting historical data at {target_time}...")
inference_data = await data_provider.get_inference_data_unified(
symbol='ETH/USDT',
timestamp=target_time,
context_window_minutes=5
)
print(f"\n📊 Inference Data:")
print(f" Symbol: {inference_data.symbol}")
print(f" Timestamp: {inference_data.timestamp}")
print(f" Data Source: {inference_data.data_source}")
print(f" Query Latency: {inference_data.query_latency_ms:.2f}ms")
# Show multi-timeframe data
print(f"\n📈 Multi-Timeframe Data:")
for tf in ['1s', '1m', '5m', '15m', '1h', '1d']:
df = inference_data.get_timeframe_data(tf)
print(f" {tf}: {len(df)} candles")
# Show context data
if inference_data.context_data is not None:
print(f"\n🔍 Context Data: {len(inference_data.context_data)} rows")
# Cleanup
await data_provider.disable_unified_storage()
async def example_multi_timeframe():
"""Example: Get multi-timeframe data"""
print("\n" + "="*60)
print("EXAMPLE 3: Multi-Timeframe Data")
print("="*60)
data_provider = DataProvider()
# Enable unified storage
print("\n1. Enabling unified storage...")
await data_provider.enable_unified_storage()
# Get multiple timeframes
print("\n2. Getting multi-timeframe data...")
multi_tf = await data_provider.get_multi_timeframe_data_unified(
symbol='ETH/USDT',
timeframes=['1m', '5m', '1h'],
limit=100
)
print(f"\n📊 Multi-Timeframe Data:")
for timeframe, df in multi_tf.items():
print(f"\n {timeframe}:")
print(f" Rows: {len(df)}")
if not df.empty:
latest = df.iloc[-1]
print(f" Latest close: ${latest['close_price']:.2f}")
print(f" Latest volume: {latest['volume']:.2f}")
# Cleanup
await data_provider.disable_unified_storage()
async def example_orderbook():
"""Example: Get order book data"""
print("\n" + "="*60)
print("EXAMPLE 4: Order Book Data")
print("="*60)
data_provider = DataProvider()
# Enable unified storage
print("\n1. Enabling unified storage...")
await data_provider.enable_unified_storage()
# Get order book
print("\n2. Getting order book data...")
orderbook = await data_provider.get_order_book_data_unified('ETH/USDT')
print(f"\n📊 Order Book:")
print(f" Symbol: {orderbook.symbol}")
print(f" Timestamp: {orderbook.timestamp}")
print(f" Mid Price: ${orderbook.mid_price:.2f}")
print(f" Spread: ${orderbook.spread:.4f}")
print(f" Spread (bps): {orderbook.get_spread_bps():.2f}")
# Show best bid/ask
best_bid = orderbook.get_best_bid()
best_ask = orderbook.get_best_ask()
if best_bid:
print(f"\n Best Bid: ${best_bid[0]:.2f} (size: {best_bid[1]:.4f})")
if best_ask:
print(f" Best Ask: ${best_ask[0]:.2f} (size: {best_ask[1]:.4f})")
# Show imbalances
imbalances = orderbook.get_imbalance_summary()
print(f"\n📉 Imbalances:")
for key, value in imbalances.items():
print(f" {key}: {value:.4f}")
# Cleanup
await data_provider.disable_unified_storage()
async def example_statistics():
"""Example: Get unified storage statistics"""
print("\n" + "="*60)
print("EXAMPLE 5: Unified Storage Statistics")
print("="*60)
data_provider = DataProvider()
# Enable unified storage
print("\n1. Enabling unified storage...")
await data_provider.enable_unified_storage()
# Get some data to generate stats
print("\n2. Generating some activity...")
await data_provider.get_inference_data_unified('ETH/USDT')
await data_provider.get_inference_data_unified('BTC/USDT')
# Get statistics
print("\n3. Getting statistics...")
stats = data_provider.get_unified_storage_stats()
if stats.get('cache'):
print(f"\n📊 Cache Statistics:")
cache_stats = stats['cache']
print(f" Hit Rate: {cache_stats.get('hit_rate_percent', 0):.2f}%")
print(f" Total Entries: {cache_stats.get('total_entries', 0)}")
print(f" Cache Hits: {cache_stats.get('cache_hits', 0)}")
print(f" Cache Misses: {cache_stats.get('cache_misses', 0)}")
if stats.get('database'):
print(f"\n💾 Database Statistics:")
db_stats = stats['database']
print(f" Total Queries: {db_stats.get('total_queries', 0)}")
print(f" Failed Queries: {db_stats.get('failed_queries', 0)}")
print(f" Avg Query Time: {db_stats.get('avg_query_time_ms', 0):.2f}ms")
print(f" Success Rate: {db_stats.get('success_rate', 0):.2f}%")
if stats.get('ingestion'):
print(f"\n📥 Ingestion Statistics:")
ing_stats = stats['ingestion']
print(f" Total Ingested: {ing_stats.get('total_ingested', 0)}")
print(f" OHLCV Ingested: {ing_stats.get('ohlcv_ingested', 0)}")
print(f" Validation Failures: {ing_stats.get('validation_failures', 0)}")
print(f" DB Writes: {ing_stats.get('db_writes', 0)}")
# Cleanup
await data_provider.disable_unified_storage()
async def main():
"""Run all examples"""
print("\n" + "="*60)
print("UNIFIED STORAGE SYSTEM EXAMPLES")
print("="*60)
try:
# Run examples
await example_realtime_data()
await asyncio.sleep(1)
await example_historical_data()
await asyncio.sleep(1)
await example_multi_timeframe()
await asyncio.sleep(1)
await example_orderbook()
await asyncio.sleep(1)
await example_statistics()
print("\n" + "="*60)
print("✅ All examples completed successfully!")
print("="*60 + "\n")
except Exception as e:
print(f"\n❌ Error running examples: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())