Files
gogo2/core/unified_cache_manager.py
Dobromir Popov e993bc2831 cache, pivots wip
2025-10-20 15:21:44 +03:00

548 lines
20 KiB
Python

"""
Data Cache Manager for unified storage system.
Provides low-latency in-memory caching for real-time data access.
"""
import time
import logging
from collections import deque
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Deque, Any
from threading import Lock
import pandas as pd
from .unified_data_models import OHLCVCandle, OrderBookDataFrame, TradeEvent
logger = logging.getLogger(__name__)
class DataCacheManager:
"""
Manages in-memory cache for real-time data.
Provides <10ms latency for latest data access.
Cache Structure:
- OHLCV: Last 5 minutes per symbol per timeframe
- Order book: Last 5 minutes per symbol
- Imbalances: Last 5 minutes per symbol
- Trades: Last 5 minutes per symbol
"""
def __init__(self, cache_duration_seconds: int = 300):
"""
Initialize cache manager.
Args:
cache_duration_seconds: Duration to keep data in cache (default 5 minutes)
"""
self.cache_duration = cache_duration_seconds
self.cache_duration_td = timedelta(seconds=cache_duration_seconds)
# In-memory storage with thread-safe locks
self.lock = Lock()
# OHLCV cache: {symbol: {timeframe: deque of candles}}
self.ohlcv_cache: Dict[str, Dict[str, Deque[Dict]]] = {}
# Order book cache: {symbol: deque of snapshots}
self.orderbook_cache: Dict[str, Deque[Dict]] = {}
# Imbalance cache: {symbol: deque of imbalance metrics}
self.imbalance_cache: Dict[str, Deque[Dict]] = {}
# Trade cache: {symbol: deque of trades}
self.trade_cache: Dict[str, Deque[Dict]] = {}
# Cache statistics
self.cache_hits = 0
self.cache_misses = 0
self.total_inserts = 0
self.total_evictions = 0
# Last eviction time
self.last_eviction = datetime.now()
self.eviction_interval = timedelta(seconds=10) # Evict every 10 seconds
logger.info(f"DataCacheManager initialized with {cache_duration_seconds}s cache duration")
def add_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
"""
Add OHLCV candle to cache.
Args:
symbol: Trading symbol
timeframe: Timeframe (1s, 1m, etc.)
candle: Candle dictionary with OHLCV data
"""
with self.lock:
try:
# Initialize symbol cache if needed
if symbol not in self.ohlcv_cache:
self.ohlcv_cache[symbol] = {}
# Initialize timeframe cache if needed
if timeframe not in self.ohlcv_cache[symbol]:
# Calculate max items for this timeframe
max_items = self._calculate_max_items(timeframe)
self.ohlcv_cache[symbol][timeframe] = deque(maxlen=max_items)
# Add candle with timestamp
candle_with_ts = candle.copy()
if 'timestamp' not in candle_with_ts:
candle_with_ts['timestamp'] = datetime.now()
self.ohlcv_cache[symbol][timeframe].append(candle_with_ts)
self.total_inserts += 1
logger.debug(f"Added OHLCV candle to cache: {symbol} {timeframe}")
except Exception as e:
logger.error(f"Error adding OHLCV candle to cache: {e}")
def add_orderbook_snapshot(self, symbol: str, snapshot: Dict):
"""
Add order book snapshot to cache.
Args:
symbol: Trading symbol
snapshot: Order book snapshot dictionary
"""
with self.lock:
try:
# Initialize symbol cache if needed
if symbol not in self.orderbook_cache:
# 5 minutes at ~1 snapshot per second = 300 snapshots
self.orderbook_cache[symbol] = deque(maxlen=300)
# Add snapshot with timestamp
snapshot_with_ts = snapshot.copy()
if 'timestamp' not in snapshot_with_ts:
snapshot_with_ts['timestamp'] = datetime.now()
self.orderbook_cache[symbol].append(snapshot_with_ts)
self.total_inserts += 1
logger.debug(f"Added order book snapshot to cache: {symbol}")
except Exception as e:
logger.error(f"Error adding order book snapshot to cache: {e}")
def add_imbalance_data(self, symbol: str, imbalance: Dict):
"""
Add imbalance metrics to cache.
Args:
symbol: Trading symbol
imbalance: Imbalance metrics dictionary
"""
with self.lock:
try:
# Initialize symbol cache if needed
if symbol not in self.imbalance_cache:
# 5 minutes at 1 per second = 300 entries
self.imbalance_cache[symbol] = deque(maxlen=300)
# Add imbalance with timestamp
imbalance_with_ts = imbalance.copy()
if 'timestamp' not in imbalance_with_ts:
imbalance_with_ts['timestamp'] = datetime.now()
self.imbalance_cache[symbol].append(imbalance_with_ts)
self.total_inserts += 1
logger.debug(f"Added imbalance data to cache: {symbol}")
except Exception as e:
logger.error(f"Error adding imbalance data to cache: {e}")
def add_trade(self, symbol: str, trade: Dict):
"""
Add trade event to cache.
Args:
symbol: Trading symbol
trade: Trade event dictionary
"""
with self.lock:
try:
# Initialize symbol cache if needed
if symbol not in self.trade_cache:
# 5 minutes at ~10 trades per second = 3000 trades
self.trade_cache[symbol] = deque(maxlen=3000)
# Add trade with timestamp
trade_with_ts = trade.copy()
if 'timestamp' not in trade_with_ts:
trade_with_ts['timestamp'] = datetime.now()
self.trade_cache[symbol].append(trade_with_ts)
self.total_inserts += 1
logger.debug(f"Added trade to cache: {symbol}")
except Exception as e:
logger.error(f"Error adding trade to cache: {e}")
def get_latest_ohlcv(self, symbol: str, timeframe: str, limit: int = 100) -> List[Dict]:
"""
Get latest OHLCV candles from cache.
Args:
symbol: Trading symbol
timeframe: Timeframe
limit: Maximum number of candles to return
Returns:
List of candle dictionaries (most recent last)
"""
start_time = time.time()
with self.lock:
try:
# Check if symbol and timeframe exist in cache
if symbol not in self.ohlcv_cache or timeframe not in self.ohlcv_cache[symbol]:
self.cache_misses += 1
return []
# Get candles
candles = list(self.ohlcv_cache[symbol][timeframe])
# Return last N candles
result = candles[-limit:] if len(candles) > limit else candles
self.cache_hits += 1
latency_ms = (time.time() - start_time) * 1000
logger.debug(f"Retrieved {len(result)} OHLCV candles from cache in {latency_ms:.2f}ms")
return result
except Exception as e:
logger.error(f"Error getting OHLCV from cache: {e}")
self.cache_misses += 1
return []
def get_latest_orderbook(self, symbol: str) -> Optional[Dict]:
"""
Get latest order book snapshot from cache.
Args:
symbol: Trading symbol
Returns:
Latest order book snapshot or None
"""
start_time = time.time()
with self.lock:
try:
# Check if symbol exists in cache
if symbol not in self.orderbook_cache or not self.orderbook_cache[symbol]:
self.cache_misses += 1
return None
# Get latest snapshot
result = self.orderbook_cache[symbol][-1].copy()
self.cache_hits += 1
latency_ms = (time.time() - start_time) * 1000
logger.debug(f"Retrieved order book from cache in {latency_ms:.2f}ms")
return result
except Exception as e:
logger.error(f"Error getting order book from cache: {e}")
self.cache_misses += 1
return None
def get_latest_imbalances(self, symbol: str, limit: int = 60) -> List[Dict]:
"""
Get latest imbalance metrics from cache.
Args:
symbol: Trading symbol
limit: Maximum number of entries to return
Returns:
List of imbalance dictionaries (most recent last)
"""
start_time = time.time()
with self.lock:
try:
# Check if symbol exists in cache
if symbol not in self.imbalance_cache:
self.cache_misses += 1
return []
# Get imbalances
imbalances = list(self.imbalance_cache[symbol])
# Return last N entries
result = imbalances[-limit:] if len(imbalances) > limit else imbalances
self.cache_hits += 1
latency_ms = (time.time() - start_time) * 1000
logger.debug(f"Retrieved {len(result)} imbalances from cache in {latency_ms:.2f}ms")
return result
except Exception as e:
logger.error(f"Error getting imbalances from cache: {e}")
self.cache_misses += 1
return []
def get_latest_trades(self, symbol: str, limit: int = 100) -> List[Dict]:
"""
Get latest trades from cache.
Args:
symbol: Trading symbol
limit: Maximum number of trades to return
Returns:
List of trade dictionaries (most recent last)
"""
start_time = time.time()
with self.lock:
try:
# Check if symbol exists in cache
if symbol not in self.trade_cache:
self.cache_misses += 1
return []
# Get trades
trades = list(self.trade_cache[symbol])
# Return last N trades
result = trades[-limit:] if len(trades) > limit else trades
self.cache_hits += 1
latency_ms = (time.time() - start_time) * 1000
logger.debug(f"Retrieved {len(result)} trades from cache in {latency_ms:.2f}ms")
return result
except Exception as e:
logger.error(f"Error getting trades from cache: {e}")
self.cache_misses += 1
return []
def get_ohlcv_dataframe(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame:
"""
Get OHLCV data as pandas DataFrame.
Args:
symbol: Trading symbol
timeframe: Timeframe
limit: Maximum number of candles
Returns:
DataFrame with OHLCV data
"""
candles = self.get_latest_ohlcv(symbol, timeframe, limit)
if not candles:
return pd.DataFrame()
return pd.DataFrame(candles)
def evict_old_data(self):
"""Remove data older than cache duration."""
with self.lock:
try:
now = datetime.now()
cutoff_time = now - self.cache_duration_td
eviction_count = 0
# Evict old OHLCV data
for symbol in list(self.ohlcv_cache.keys()):
for timeframe in list(self.ohlcv_cache[symbol].keys()):
cache = self.ohlcv_cache[symbol][timeframe]
original_len = len(cache)
# Remove old entries
while cache and cache[0]['timestamp'] < cutoff_time:
cache.popleft()
eviction_count += 1
# Remove empty timeframe caches
if not cache:
del self.ohlcv_cache[symbol][timeframe]
# Remove empty symbol caches
if not self.ohlcv_cache[symbol]:
del self.ohlcv_cache[symbol]
# Evict old order book data
for symbol in list(self.orderbook_cache.keys()):
cache = self.orderbook_cache[symbol]
while cache and cache[0]['timestamp'] < cutoff_time:
cache.popleft()
eviction_count += 1
if not cache:
del self.orderbook_cache[symbol]
# Evict old imbalance data
for symbol in list(self.imbalance_cache.keys()):
cache = self.imbalance_cache[symbol]
while cache and cache[0]['timestamp'] < cutoff_time:
cache.popleft()
eviction_count += 1
if not cache:
del self.imbalance_cache[symbol]
# Evict old trade data
for symbol in list(self.trade_cache.keys()):
cache = self.trade_cache[symbol]
while cache and cache[0]['timestamp'] < cutoff_time:
cache.popleft()
eviction_count += 1
if not cache:
del self.trade_cache[symbol]
self.total_evictions += eviction_count
self.last_eviction = now
if eviction_count > 0:
logger.debug(f"Evicted {eviction_count} old entries from cache")
except Exception as e:
logger.error(f"Error evicting old data: {e}")
def auto_evict_if_needed(self):
"""Automatically evict old data if interval has passed."""
now = datetime.now()
if now - self.last_eviction >= self.eviction_interval:
self.evict_old_data()
def clear_cache(self, symbol: Optional[str] = None):
"""
Clear cache data.
Args:
symbol: Symbol to clear (None = clear all)
"""
with self.lock:
if symbol:
# Clear specific symbol
if symbol in self.ohlcv_cache:
del self.ohlcv_cache[symbol]
if symbol in self.orderbook_cache:
del self.orderbook_cache[symbol]
if symbol in self.imbalance_cache:
del self.imbalance_cache[symbol]
if symbol in self.trade_cache:
del self.trade_cache[symbol]
logger.info(f"Cleared cache for symbol: {symbol}")
else:
# Clear all
self.ohlcv_cache.clear()
self.orderbook_cache.clear()
self.imbalance_cache.clear()
self.trade_cache.clear()
logger.info("Cleared all cache data")
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with self.lock:
# Calculate cache sizes
ohlcv_count = sum(
sum(len(tf_cache) for tf_cache in symbol_cache.values())
for symbol_cache in self.ohlcv_cache.values()
)
orderbook_count = sum(len(cache) for cache in self.orderbook_cache.values())
imbalance_count = sum(len(cache) for cache in self.imbalance_cache.values())
trade_count = sum(len(cache) for cache in self.trade_cache.values())
total_requests = self.cache_hits + self.cache_misses
hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
return {
'cache_duration_seconds': self.cache_duration,
'ohlcv_entries': ohlcv_count,
'orderbook_entries': orderbook_count,
'imbalance_entries': imbalance_count,
'trade_entries': trade_count,
'total_entries': ohlcv_count + orderbook_count + imbalance_count + trade_count,
'cache_hits': self.cache_hits,
'cache_misses': self.cache_misses,
'hit_rate_percent': round(hit_rate, 2),
'total_inserts': self.total_inserts,
'total_evictions': self.total_evictions,
'last_eviction': self.last_eviction.isoformat(),
'symbols_cached': {
'ohlcv': list(self.ohlcv_cache.keys()),
'orderbook': list(self.orderbook_cache.keys()),
'imbalance': list(self.imbalance_cache.keys()),
'trade': list(self.trade_cache.keys())
}
}
def _calculate_max_items(self, timeframe: str) -> int:
"""
Calculate maximum cache items for a timeframe.
Args:
timeframe: Timeframe string
Returns:
Maximum number of items to cache
"""
# Timeframe to seconds mapping
timeframe_seconds = {
'1s': 1,
'1m': 60,
'5m': 300,
'15m': 900,
'30m': 1800,
'1h': 3600,
'4h': 14400,
'1d': 86400
}
seconds = timeframe_seconds.get(timeframe, 60)
# Calculate how many candles fit in cache duration
max_items = self.cache_duration // seconds
# Ensure at least 10 items
return max(10, max_items)
def get_cache_summary(self) -> str:
"""Get human-readable cache summary."""
stats = self.get_cache_stats()
summary = f"""
Cache Summary:
--------------
Duration: {stats['cache_duration_seconds']}s
Total Entries: {stats['total_entries']}
- OHLCV: {stats['ohlcv_entries']}
- Order Book: {stats['orderbook_entries']}
- Imbalances: {stats['imbalance_entries']}
- Trades: {stats['trade_entries']}
Performance:
- Cache Hits: {stats['cache_hits']}
- Cache Misses: {stats['cache_misses']}
- Hit Rate: {stats['hit_rate_percent']}%
- Total Inserts: {stats['total_inserts']}
- Total Evictions: {stats['total_evictions']}
Last Eviction: {stats['last_eviction']}
"""
return summary