Files
gogo2/utils/cache_manager.py
2025-07-25 22:34:13 +03:00

295 lines
11 KiB
Python

"""
Cache Manager for Trading System
Utilities for managing and cleaning up cache files, including:
- Parquet file validation and repair
- Cache cleanup and maintenance
- Cache health monitoring
"""
import os
import logging
import pandas as pd
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class CacheManager:
"""Manages cache files for the trading system"""
def __init__(self, cache_dirs: List[str] = None):
"""
Initialize cache manager
Args:
cache_dirs: List of cache directories to manage
"""
self.cache_dirs = cache_dirs or [
"data/cache",
"data/monthly_cache",
"data/pivot_cache"
]
# Ensure cache directories exist
for cache_dir in self.cache_dirs:
Path(cache_dir).mkdir(parents=True, exist_ok=True)
def validate_parquet_file(self, file_path: Path) -> Tuple[bool, Optional[str]]:
"""
Validate a Parquet file
Args:
file_path: Path to the Parquet file
Returns:
Tuple of (is_valid, error_message)
"""
try:
if not file_path.exists():
return False, "File does not exist"
if file_path.stat().st_size == 0:
return False, "File is empty"
# Try to read the file
df = pd.read_parquet(file_path)
if df.empty:
return False, "File contains no data"
# Check for required columns (basic validation)
required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, f"Missing required columns: {missing_columns}"
return True, None
except Exception as e:
error_str = str(e).lower()
corrupted_indicators = [
"parquet magic bytes not found",
"corrupted",
"couldn't deserialize thrift",
"don't know what type",
"invalid parquet file",
"unexpected end of file",
"invalid metadata"
]
if any(indicator in error_str for indicator in corrupted_indicators):
return False, f"Corrupted Parquet file: {e}"
else:
return False, f"Validation error: {e}"
def scan_cache_health(self) -> Dict[str, Dict]:
"""
Scan all cache directories for file health
Returns:
Dictionary with cache health information
"""
health_report = {}
for cache_dir in self.cache_dirs:
cache_path = Path(cache_dir)
if not cache_path.exists():
continue
dir_report = {
'total_files': 0,
'valid_files': 0,
'corrupted_files': 0,
'empty_files': 0,
'total_size_mb': 0.0,
'corrupted_files_list': [],
'old_files': []
}
# Scan all Parquet files
for file_path in cache_path.glob("*.parquet"):
dir_report['total_files'] += 1
file_size_mb = file_path.stat().st_size / (1024 * 1024)
dir_report['total_size_mb'] += file_size_mb
# Check file age
file_age = datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime)
if file_age > timedelta(days=7): # Files older than 7 days
dir_report['old_files'].append({
'file': str(file_path),
'age_days': file_age.days,
'size_mb': file_size_mb
})
# Validate file
is_valid, error_msg = self.validate_parquet_file(file_path)
if is_valid:
dir_report['valid_files'] += 1
else:
if "empty" in error_msg.lower():
dir_report['empty_files'] += 1
else:
dir_report['corrupted_files'] += 1
dir_report['corrupted_files_list'].append({
'file': str(file_path),
'error': error_msg,
'size_mb': file_size_mb
})
health_report[cache_dir] = dir_report
return health_report
def cleanup_corrupted_files(self, dry_run: bool = True) -> Dict[str, List[str]]:
"""
Clean up corrupted cache files
Args:
dry_run: If True, only report what would be deleted
Returns:
Dictionary of deleted files by directory
"""
deleted_files = {}
for cache_dir in self.cache_dirs:
cache_path = Path(cache_dir)
if not cache_path.exists():
continue
deleted_files[cache_dir] = []
for file_path in cache_path.glob("*.parquet"):
is_valid, error_msg = self.validate_parquet_file(file_path)
if not is_valid:
if dry_run:
deleted_files[cache_dir].append(f"WOULD DELETE: {file_path} ({error_msg})")
logger.info(f"Would delete corrupted file: {file_path} ({error_msg})")
else:
try:
file_path.unlink()
deleted_files[cache_dir].append(f"DELETED: {file_path} ({error_msg})")
logger.info(f"Deleted corrupted file: {file_path}")
except Exception as e:
deleted_files[cache_dir].append(f"FAILED TO DELETE: {file_path} ({e})")
logger.error(f"Failed to delete corrupted file {file_path}: {e}")
return deleted_files
def cleanup_old_files(self, days_to_keep: int = 7, dry_run: bool = True) -> Dict[str, List[str]]:
"""
Clean up old cache files
Args:
days_to_keep: Number of days to keep files
dry_run: If True, only report what would be deleted
Returns:
Dictionary of deleted files by directory
"""
deleted_files = {}
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
for cache_dir in self.cache_dirs:
cache_path = Path(cache_dir)
if not cache_path.exists():
continue
deleted_files[cache_dir] = []
for file_path in cache_path.glob("*.parquet"):
file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
if file_mtime < cutoff_date:
age_days = (datetime.now() - file_mtime).days
if dry_run:
deleted_files[cache_dir].append(f"WOULD DELETE: {file_path} (age: {age_days} days)")
logger.info(f"Would delete old file: {file_path} (age: {age_days} days)")
else:
try:
file_path.unlink()
deleted_files[cache_dir].append(f"DELETED: {file_path} (age: {age_days} days)")
logger.info(f"Deleted old file: {file_path}")
except Exception as e:
deleted_files[cache_dir].append(f"FAILED TO DELETE: {file_path} ({e})")
logger.error(f"Failed to delete old file {file_path}: {e}")
return deleted_files
def get_cache_summary(self) -> Dict[str, any]:
"""Get a summary of cache usage"""
health_report = self.scan_cache_health()
total_files = sum(report['total_files'] for report in health_report.values())
total_valid = sum(report['valid_files'] for report in health_report.values())
total_corrupted = sum(report['corrupted_files'] for report in health_report.values())
total_size_mb = sum(report['total_size_mb'] for report in health_report.values())
return {
'total_files': total_files,
'valid_files': total_valid,
'corrupted_files': total_corrupted,
'health_percentage': (total_valid / total_files * 100) if total_files > 0 else 0,
'total_size_mb': total_size_mb,
'directories': health_report
}
def emergency_cache_reset(self, confirm: bool = False) -> bool:
"""
Emergency cache reset - deletes all cache files
Args:
confirm: Must be True to actually delete files
Returns:
True if reset was performed
"""
if not confirm:
logger.warning("Emergency cache reset called but not confirmed")
return False
deleted_count = 0
for cache_dir in self.cache_dirs:
cache_path = Path(cache_dir)
if not cache_path.exists():
continue
for file_path in cache_path.glob("*"):
try:
if file_path.is_file():
file_path.unlink()
deleted_count += 1
except Exception as e:
logger.error(f"Failed to delete {file_path}: {e}")
logger.warning(f"Emergency cache reset completed: deleted {deleted_count} files")
return True
# Global cache manager instance
_cache_manager_instance = None
def get_cache_manager() -> CacheManager:
"""Get the global cache manager instance"""
global _cache_manager_instance
if _cache_manager_instance is None:
_cache_manager_instance = CacheManager()
return _cache_manager_instance
def cleanup_corrupted_cache(dry_run: bool = True) -> Dict[str, List[str]]:
"""Convenience function to clean up corrupted cache files"""
cache_manager = get_cache_manager()
return cache_manager.cleanup_corrupted_files(dry_run=dry_run)
def get_cache_health() -> Dict[str, any]:
"""Convenience function to get cache health summary"""
cache_manager = get_cache_manager()
return cache_manager.get_cache_summary()