156 lines
5.8 KiB
Python
156 lines
5.8 KiB
Python
"""
|
|
Text File Logger for Trading System
|
|
|
|
Simple text file logging for tracking inference records and system events
|
|
Provides human-readable logs alongside database storage
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TextLogger:
|
|
"""Simple text file logger for trading system events"""
|
|
|
|
def __init__(self, log_dir: str = "logs"):
|
|
self.log_dir = Path(log_dir)
|
|
self.log_dir.mkdir(exist_ok=True)
|
|
|
|
# Create separate log files for different types of events
|
|
self.inference_log = self.log_dir / "inference_records.txt"
|
|
self.checkpoint_log = self.log_dir / "checkpoint_events.txt"
|
|
self.system_log = self.log_dir / "system_events.txt"
|
|
|
|
def log_inference(self, model_name: str, symbol: str, action: str,
|
|
confidence: float, processing_time_ms: float,
|
|
checkpoint_id: str = None) -> bool:
|
|
"""Log inference record to text file"""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
checkpoint_info = f" [checkpoint: {checkpoint_id}]" if checkpoint_id else ""
|
|
|
|
log_entry = (
|
|
f"{timestamp} | {model_name:15} | {symbol:10} | "
|
|
f"{action:4} | conf={confidence:.3f} | "
|
|
f"time={processing_time_ms:6.1f}ms{checkpoint_info}\n"
|
|
)
|
|
|
|
with open(self.inference_log, 'a', encoding='utf-8') as f:
|
|
f.write(log_entry)
|
|
f.flush()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log inference to text file: {e}")
|
|
return False
|
|
|
|
def log_checkpoint_event(self, model_name: str, event_type: str,
|
|
checkpoint_id: str, details: str = "") -> bool:
|
|
"""Log checkpoint events (save, load, etc.)"""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
details_str = f" - {details}" if details else ""
|
|
|
|
log_entry = (
|
|
f"{timestamp} | {model_name:15} | {event_type:10} | "
|
|
f"{checkpoint_id}{details_str}\n"
|
|
)
|
|
|
|
with open(self.checkpoint_log, 'a', encoding='utf-8') as f:
|
|
f.write(log_entry)
|
|
f.flush()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log checkpoint event to text file: {e}")
|
|
return False
|
|
|
|
def log_system_event(self, event_type: str, message: str,
|
|
component: str = "system") -> bool:
|
|
"""Log general system events"""
|
|
try:
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
log_entry = (
|
|
f"{timestamp} | {component:15} | {event_type:10} | {message}\n"
|
|
)
|
|
|
|
with open(self.system_log, 'a', encoding='utf-8') as f:
|
|
f.write(log_entry)
|
|
f.flush()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log system event to text file: {e}")
|
|
return False
|
|
|
|
def get_recent_inferences(self, lines: int = 50) -> str:
|
|
"""Get recent inference records from text file"""
|
|
try:
|
|
if not self.inference_log.exists():
|
|
return "No inference records found"
|
|
|
|
with open(self.inference_log, 'r', encoding='utf-8') as f:
|
|
all_lines = f.readlines()
|
|
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
|
|
return ''.join(recent_lines)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to read inference log: {e}")
|
|
return f"Error reading log: {e}"
|
|
|
|
def get_recent_checkpoint_events(self, lines: int = 20) -> str:
|
|
"""Get recent checkpoint events from text file"""
|
|
try:
|
|
if not self.checkpoint_log.exists():
|
|
return "No checkpoint events found"
|
|
|
|
with open(self.checkpoint_log, 'r', encoding='utf-8') as f:
|
|
all_lines = f.readlines()
|
|
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
|
|
return ''.join(recent_lines)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to read checkpoint log: {e}")
|
|
return f"Error reading log: {e}"
|
|
|
|
def cleanup_old_logs(self, max_lines: int = 10000) -> bool:
|
|
"""Keep only the most recent log entries"""
|
|
try:
|
|
for log_file in [self.inference_log, self.checkpoint_log, self.system_log]:
|
|
if log_file.exists():
|
|
with open(log_file, 'r', encoding='utf-8') as f:
|
|
lines = f.readlines()
|
|
|
|
if len(lines) > max_lines:
|
|
# Keep only the most recent lines
|
|
recent_lines = lines[-max_lines:]
|
|
with open(log_file, 'w', encoding='utf-8') as f:
|
|
f.writelines(recent_lines)
|
|
|
|
logger.info(f"Cleaned up {log_file.name}: kept {len(recent_lines)} lines")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to cleanup logs: {e}")
|
|
return False
|
|
|
|
# Global text logger instance
|
|
_text_logger_instance = None
|
|
|
|
def get_text_logger(log_dir: str = "logs") -> TextLogger:
|
|
"""Get the global text logger instance"""
|
|
global _text_logger_instance
|
|
|
|
if _text_logger_instance is None:
|
|
_text_logger_instance = TextLogger(log_dir)
|
|
|
|
return _text_logger_instance |