295 lines
11 KiB
Python
295 lines
11 KiB
Python
"""
|
|
Cache Manager for Trading System
|
|
|
|
Utilities for managing and cleaning up cache files, including:
|
|
- Parquet file validation and repair
|
|
- Cache cleanup and maintenance
|
|
- Cache health monitoring
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional, Tuple
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class CacheManager:
|
|
"""Manages cache files for the trading system"""
|
|
|
|
def __init__(self, cache_dirs: List[str] = None):
|
|
"""
|
|
Initialize cache manager
|
|
|
|
Args:
|
|
cache_dirs: List of cache directories to manage
|
|
"""
|
|
self.cache_dirs = cache_dirs or [
|
|
"data/cache",
|
|
"data/monthly_cache",
|
|
"data/pivot_cache"
|
|
]
|
|
|
|
# Ensure cache directories exist
|
|
for cache_dir in self.cache_dirs:
|
|
Path(cache_dir).mkdir(parents=True, exist_ok=True)
|
|
|
|
def validate_parquet_file(self, file_path: Path) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate a Parquet file
|
|
|
|
Args:
|
|
file_path: Path to the Parquet file
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
try:
|
|
if not file_path.exists():
|
|
return False, "File does not exist"
|
|
|
|
if file_path.stat().st_size == 0:
|
|
return False, "File is empty"
|
|
|
|
# Try to read the file
|
|
df = pd.read_parquet(file_path)
|
|
|
|
if df.empty:
|
|
return False, "File contains no data"
|
|
|
|
# Check for required columns (basic validation)
|
|
required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
|
|
missing_columns = [col for col in required_columns if col not in df.columns]
|
|
|
|
if missing_columns:
|
|
return False, f"Missing required columns: {missing_columns}"
|
|
|
|
return True, None
|
|
|
|
except Exception as e:
|
|
error_str = str(e).lower()
|
|
corrupted_indicators = [
|
|
"parquet magic bytes not found",
|
|
"corrupted",
|
|
"couldn't deserialize thrift",
|
|
"don't know what type",
|
|
"invalid parquet file",
|
|
"unexpected end of file",
|
|
"invalid metadata"
|
|
]
|
|
|
|
if any(indicator in error_str for indicator in corrupted_indicators):
|
|
return False, f"Corrupted Parquet file: {e}"
|
|
else:
|
|
return False, f"Validation error: {e}"
|
|
|
|
def scan_cache_health(self) -> Dict[str, Dict]:
|
|
"""
|
|
Scan all cache directories for file health
|
|
|
|
Returns:
|
|
Dictionary with cache health information
|
|
"""
|
|
health_report = {}
|
|
|
|
for cache_dir in self.cache_dirs:
|
|
cache_path = Path(cache_dir)
|
|
if not cache_path.exists():
|
|
continue
|
|
|
|
dir_report = {
|
|
'total_files': 0,
|
|
'valid_files': 0,
|
|
'corrupted_files': 0,
|
|
'empty_files': 0,
|
|
'total_size_mb': 0.0,
|
|
'corrupted_files_list': [],
|
|
'old_files': []
|
|
}
|
|
|
|
# Scan all Parquet files
|
|
for file_path in cache_path.glob("*.parquet"):
|
|
dir_report['total_files'] += 1
|
|
file_size_mb = file_path.stat().st_size / (1024 * 1024)
|
|
dir_report['total_size_mb'] += file_size_mb
|
|
|
|
# Check file age
|
|
file_age = datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime)
|
|
if file_age > timedelta(days=7): # Files older than 7 days
|
|
dir_report['old_files'].append({
|
|
'file': str(file_path),
|
|
'age_days': file_age.days,
|
|
'size_mb': file_size_mb
|
|
})
|
|
|
|
# Validate file
|
|
is_valid, error_msg = self.validate_parquet_file(file_path)
|
|
|
|
if is_valid:
|
|
dir_report['valid_files'] += 1
|
|
else:
|
|
if "empty" in error_msg.lower():
|
|
dir_report['empty_files'] += 1
|
|
else:
|
|
dir_report['corrupted_files'] += 1
|
|
dir_report['corrupted_files_list'].append({
|
|
'file': str(file_path),
|
|
'error': error_msg,
|
|
'size_mb': file_size_mb
|
|
})
|
|
|
|
health_report[cache_dir] = dir_report
|
|
|
|
return health_report
|
|
|
|
def cleanup_corrupted_files(self, dry_run: bool = True) -> Dict[str, List[str]]:
|
|
"""
|
|
Clean up corrupted cache files
|
|
|
|
Args:
|
|
dry_run: If True, only report what would be deleted
|
|
|
|
Returns:
|
|
Dictionary of deleted files by directory
|
|
"""
|
|
deleted_files = {}
|
|
|
|
for cache_dir in self.cache_dirs:
|
|
cache_path = Path(cache_dir)
|
|
if not cache_path.exists():
|
|
continue
|
|
|
|
deleted_files[cache_dir] = []
|
|
|
|
for file_path in cache_path.glob("*.parquet"):
|
|
is_valid, error_msg = self.validate_parquet_file(file_path)
|
|
|
|
if not is_valid:
|
|
if dry_run:
|
|
deleted_files[cache_dir].append(f"WOULD DELETE: {file_path} ({error_msg})")
|
|
logger.info(f"Would delete corrupted file: {file_path} ({error_msg})")
|
|
else:
|
|
try:
|
|
file_path.unlink()
|
|
deleted_files[cache_dir].append(f"DELETED: {file_path} ({error_msg})")
|
|
logger.info(f"Deleted corrupted file: {file_path}")
|
|
except Exception as e:
|
|
deleted_files[cache_dir].append(f"FAILED TO DELETE: {file_path} ({e})")
|
|
logger.error(f"Failed to delete corrupted file {file_path}: {e}")
|
|
|
|
return deleted_files
|
|
|
|
def cleanup_old_files(self, days_to_keep: int = 7, dry_run: bool = True) -> Dict[str, List[str]]:
|
|
"""
|
|
Clean up old cache files
|
|
|
|
Args:
|
|
days_to_keep: Number of days to keep files
|
|
dry_run: If True, only report what would be deleted
|
|
|
|
Returns:
|
|
Dictionary of deleted files by directory
|
|
"""
|
|
deleted_files = {}
|
|
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
|
|
|
|
for cache_dir in self.cache_dirs:
|
|
cache_path = Path(cache_dir)
|
|
if not cache_path.exists():
|
|
continue
|
|
|
|
deleted_files[cache_dir] = []
|
|
|
|
for file_path in cache_path.glob("*.parquet"):
|
|
file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
|
|
|
|
if file_mtime < cutoff_date:
|
|
age_days = (datetime.now() - file_mtime).days
|
|
|
|
if dry_run:
|
|
deleted_files[cache_dir].append(f"WOULD DELETE: {file_path} (age: {age_days} days)")
|
|
logger.info(f"Would delete old file: {file_path} (age: {age_days} days)")
|
|
else:
|
|
try:
|
|
file_path.unlink()
|
|
deleted_files[cache_dir].append(f"DELETED: {file_path} (age: {age_days} days)")
|
|
logger.info(f"Deleted old file: {file_path}")
|
|
except Exception as e:
|
|
deleted_files[cache_dir].append(f"FAILED TO DELETE: {file_path} ({e})")
|
|
logger.error(f"Failed to delete old file {file_path}: {e}")
|
|
|
|
return deleted_files
|
|
|
|
def get_cache_summary(self) -> Dict[str, any]:
|
|
"""Get a summary of cache usage"""
|
|
health_report = self.scan_cache_health()
|
|
|
|
total_files = sum(report['total_files'] for report in health_report.values())
|
|
total_valid = sum(report['valid_files'] for report in health_report.values())
|
|
total_corrupted = sum(report['corrupted_files'] for report in health_report.values())
|
|
total_size_mb = sum(report['total_size_mb'] for report in health_report.values())
|
|
|
|
return {
|
|
'total_files': total_files,
|
|
'valid_files': total_valid,
|
|
'corrupted_files': total_corrupted,
|
|
'health_percentage': (total_valid / total_files * 100) if total_files > 0 else 0,
|
|
'total_size_mb': total_size_mb,
|
|
'directories': health_report
|
|
}
|
|
|
|
def emergency_cache_reset(self, confirm: bool = False) -> bool:
|
|
"""
|
|
Emergency cache reset - deletes all cache files
|
|
|
|
Args:
|
|
confirm: Must be True to actually delete files
|
|
|
|
Returns:
|
|
True if reset was performed
|
|
"""
|
|
if not confirm:
|
|
logger.warning("Emergency cache reset called but not confirmed")
|
|
return False
|
|
|
|
deleted_count = 0
|
|
|
|
for cache_dir in self.cache_dirs:
|
|
cache_path = Path(cache_dir)
|
|
if not cache_path.exists():
|
|
continue
|
|
|
|
for file_path in cache_path.glob("*"):
|
|
try:
|
|
if file_path.is_file():
|
|
file_path.unlink()
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete {file_path}: {e}")
|
|
|
|
logger.warning(f"Emergency cache reset completed: deleted {deleted_count} files")
|
|
return True
|
|
|
|
# Global cache manager instance
|
|
_cache_manager_instance = None
|
|
|
|
def get_cache_manager() -> CacheManager:
|
|
"""Get the global cache manager instance"""
|
|
global _cache_manager_instance
|
|
|
|
if _cache_manager_instance is None:
|
|
_cache_manager_instance = CacheManager()
|
|
|
|
return _cache_manager_instance
|
|
|
|
def cleanup_corrupted_cache(dry_run: bool = True) -> Dict[str, List[str]]:
|
|
"""Convenience function to clean up corrupted cache files"""
|
|
cache_manager = get_cache_manager()
|
|
return cache_manager.cleanup_corrupted_files(dry_run=dry_run)
|
|
|
|
def get_cache_health() -> Dict[str, any]:
|
|
"""Convenience function to get cache health summary"""
|
|
cache_manager = get_cache_manager()
|
|
return cache_manager.get_cache_summary() |