""" Cache Manager for Trading System Utilities for managing and cleaning up cache files, including: - Parquet file validation and repair - Cache cleanup and maintenance - Cache health monitoring """ import os import logging import pandas as pd from pathlib import Path from typing import List, Dict, Optional, Tuple from datetime import datetime, timedelta logger = logging.getLogger(__name__) class CacheManager: """Manages cache files for the trading system""" def __init__(self, cache_dirs: List[str] = None): """ Initialize cache manager Args: cache_dirs: List of cache directories to manage """ self.cache_dirs = cache_dirs or [ "data/cache", "data/monthly_cache", "data/pivot_cache" ] # Ensure cache directories exist for cache_dir in self.cache_dirs: Path(cache_dir).mkdir(parents=True, exist_ok=True) def validate_parquet_file(self, file_path: Path) -> Tuple[bool, Optional[str]]: """ Validate a Parquet file Args: file_path: Path to the Parquet file Returns: Tuple of (is_valid, error_message) """ try: if not file_path.exists(): return False, "File does not exist" if file_path.stat().st_size == 0: return False, "File is empty" # Try to read the file df = pd.read_parquet(file_path) if df.empty: return False, "File contains no data" # Check for required columns (basic validation) required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: return False, f"Missing required columns: {missing_columns}" return True, None except Exception as e: error_str = str(e).lower() corrupted_indicators = [ "parquet magic bytes not found", "corrupted", "couldn't deserialize thrift", "don't know what type", "invalid parquet file", "unexpected end of file", "invalid metadata" ] if any(indicator in error_str for indicator in corrupted_indicators): return False, f"Corrupted Parquet file: {e}" else: return False, f"Validation error: {e}" def scan_cache_health(self) -> Dict[str, Dict]: """ Scan all cache directories for file health Returns: Dictionary with cache health information """ health_report = {} for cache_dir in self.cache_dirs: cache_path = Path(cache_dir) if not cache_path.exists(): continue dir_report = { 'total_files': 0, 'valid_files': 0, 'corrupted_files': 0, 'empty_files': 0, 'total_size_mb': 0.0, 'corrupted_files_list': [], 'old_files': [] } # Scan all Parquet files for file_path in cache_path.glob("*.parquet"): dir_report['total_files'] += 1 file_size_mb = file_path.stat().st_size / (1024 * 1024) dir_report['total_size_mb'] += file_size_mb # Check file age file_age = datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime) if file_age > timedelta(days=7): # Files older than 7 days dir_report['old_files'].append({ 'file': str(file_path), 'age_days': file_age.days, 'size_mb': file_size_mb }) # Validate file is_valid, error_msg = self.validate_parquet_file(file_path) if is_valid: dir_report['valid_files'] += 1 else: if "empty" in error_msg.lower(): dir_report['empty_files'] += 1 else: dir_report['corrupted_files'] += 1 dir_report['corrupted_files_list'].append({ 'file': str(file_path), 'error': error_msg, 'size_mb': file_size_mb }) health_report[cache_dir] = dir_report return health_report def cleanup_corrupted_files(self, dry_run: bool = True) -> Dict[str, List[str]]: """ Clean up corrupted cache files Args: dry_run: If True, only report what would be deleted Returns: Dictionary of deleted files by directory """ deleted_files = {} for cache_dir in self.cache_dirs: cache_path = Path(cache_dir) if not cache_path.exists(): continue deleted_files[cache_dir] = [] for file_path in cache_path.glob("*.parquet"): is_valid, error_msg = self.validate_parquet_file(file_path) if not is_valid: if dry_run: deleted_files[cache_dir].append(f"WOULD DELETE: {file_path} ({error_msg})") logger.info(f"Would delete corrupted file: {file_path} ({error_msg})") else: try: file_path.unlink() deleted_files[cache_dir].append(f"DELETED: {file_path} ({error_msg})") logger.info(f"Deleted corrupted file: {file_path}") except Exception as e: deleted_files[cache_dir].append(f"FAILED TO DELETE: {file_path} ({e})") logger.error(f"Failed to delete corrupted file {file_path}: {e}") return deleted_files def cleanup_old_files(self, days_to_keep: int = 7, dry_run: bool = True) -> Dict[str, List[str]]: """ Clean up old cache files Args: days_to_keep: Number of days to keep files dry_run: If True, only report what would be deleted Returns: Dictionary of deleted files by directory """ deleted_files = {} cutoff_date = datetime.now() - timedelta(days=days_to_keep) for cache_dir in self.cache_dirs: cache_path = Path(cache_dir) if not cache_path.exists(): continue deleted_files[cache_dir] = [] for file_path in cache_path.glob("*.parquet"): file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime) if file_mtime < cutoff_date: age_days = (datetime.now() - file_mtime).days if dry_run: deleted_files[cache_dir].append(f"WOULD DELETE: {file_path} (age: {age_days} days)") logger.info(f"Would delete old file: {file_path} (age: {age_days} days)") else: try: file_path.unlink() deleted_files[cache_dir].append(f"DELETED: {file_path} (age: {age_days} days)") logger.info(f"Deleted old file: {file_path}") except Exception as e: deleted_files[cache_dir].append(f"FAILED TO DELETE: {file_path} ({e})") logger.error(f"Failed to delete old file {file_path}: {e}") return deleted_files def get_cache_summary(self) -> Dict[str, any]: """Get a summary of cache usage""" health_report = self.scan_cache_health() total_files = sum(report['total_files'] for report in health_report.values()) total_valid = sum(report['valid_files'] for report in health_report.values()) total_corrupted = sum(report['corrupted_files'] for report in health_report.values()) total_size_mb = sum(report['total_size_mb'] for report in health_report.values()) return { 'total_files': total_files, 'valid_files': total_valid, 'corrupted_files': total_corrupted, 'health_percentage': (total_valid / total_files * 100) if total_files > 0 else 0, 'total_size_mb': total_size_mb, 'directories': health_report } def emergency_cache_reset(self, confirm: bool = False) -> bool: """ Emergency cache reset - deletes all cache files Args: confirm: Must be True to actually delete files Returns: True if reset was performed """ if not confirm: logger.warning("Emergency cache reset called but not confirmed") return False deleted_count = 0 for cache_dir in self.cache_dirs: cache_path = Path(cache_dir) if not cache_path.exists(): continue for file_path in cache_path.glob("*"): try: if file_path.is_file(): file_path.unlink() deleted_count += 1 except Exception as e: logger.error(f"Failed to delete {file_path}: {e}") logger.warning(f"Emergency cache reset completed: deleted {deleted_count} files") return True # Global cache manager instance _cache_manager_instance = None def get_cache_manager() -> CacheManager: """Get the global cache manager instance""" global _cache_manager_instance if _cache_manager_instance is None: _cache_manager_instance = CacheManager() return _cache_manager_instance def cleanup_corrupted_cache(dry_run: bool = True) -> Dict[str, List[str]]: """Convenience function to clean up corrupted cache files""" cache_manager = get_cache_manager() return cache_manager.cleanup_corrupted_files(dry_run=dry_run) def get_cache_health() -> Dict[str, any]: """Convenience function to get cache health summary""" cache_manager = get_cache_manager() return cache_manager.get_cache_summary()