""" Timing utilities for the COBY system. """ import time from datetime import datetime, timezone from typing import Optional def get_current_timestamp() -> datetime: """ Get current UTC timestamp. Returns: datetime: Current UTC timestamp """ return datetime.now(timezone.utc) def format_timestamp(timestamp: datetime, format_str: str = "%Y-%m-%d %H:%M:%S.%f") -> str: """ Format timestamp to string. Args: timestamp: Timestamp to format format_str: Format string Returns: str: Formatted timestamp string """ return timestamp.strftime(format_str) def parse_timestamp(timestamp_str: str, format_str: str = "%Y-%m-%d %H:%M:%S.%f") -> datetime: """ Parse timestamp string to datetime. Args: timestamp_str: Timestamp string to parse format_str: Format string Returns: datetime: Parsed timestamp """ dt = datetime.strptime(timestamp_str, format_str) # Ensure timezone awareness if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt def timestamp_to_unix(timestamp: datetime) -> float: """ Convert datetime to Unix timestamp. Args: timestamp: Datetime to convert Returns: float: Unix timestamp """ return timestamp.timestamp() def unix_to_timestamp(unix_time: float) -> datetime: """ Convert Unix timestamp to datetime. Args: unix_time: Unix timestamp Returns: datetime: Converted datetime (UTC) """ return datetime.fromtimestamp(unix_time, tz=timezone.utc) def calculate_time_diff(start: datetime, end: datetime) -> float: """ Calculate time difference in seconds. Args: start: Start timestamp end: End timestamp Returns: float: Time difference in seconds """ return (end - start).total_seconds() def is_timestamp_recent(timestamp: datetime, max_age_seconds: int = 60) -> bool: """ Check if timestamp is recent (within max_age_seconds). Args: timestamp: Timestamp to check max_age_seconds: Maximum age in seconds Returns: bool: True if recent, False otherwise """ now = get_current_timestamp() age = calculate_time_diff(timestamp, now) return age <= max_age_seconds def sleep_until(target_time: datetime) -> None: """ Sleep until target time. Args: target_time: Target timestamp to sleep until """ now = get_current_timestamp() sleep_seconds = calculate_time_diff(now, target_time) if sleep_seconds > 0: time.sleep(sleep_seconds) def get_milliseconds() -> int: """ Get current timestamp in milliseconds. Returns: int: Current timestamp in milliseconds """ return int(time.time() * 1000) def milliseconds_to_timestamp(ms: int) -> datetime: """ Convert milliseconds to datetime. Args: ms: Milliseconds timestamp Returns: datetime: Converted datetime (UTC) """ return datetime.fromtimestamp(ms / 1000.0, tz=timezone.utc) def round_timestamp(timestamp: datetime, seconds: int) -> datetime: """ Round timestamp to nearest interval. Args: timestamp: Timestamp to round seconds: Interval in seconds Returns: datetime: Rounded timestamp """ unix_time = timestamp_to_unix(timestamp) rounded_unix = round(unix_time / seconds) * seconds return unix_to_timestamp(rounded_unix) class Timer: """Simple timer for measuring execution time""" def __init__(self): self.start_time: Optional[float] = None self.end_time: Optional[float] = None def start(self) -> None: """Start the timer""" self.start_time = time.perf_counter() self.end_time = None def stop(self) -> float: """ Stop the timer and return elapsed time. Returns: float: Elapsed time in seconds """ if self.start_time is None: raise ValueError("Timer not started") self.end_time = time.perf_counter() return self.elapsed() def elapsed(self) -> float: """ Get elapsed time. Returns: float: Elapsed time in seconds """ if self.start_time is None: return 0.0 end = self.end_time or time.perf_counter() return end - self.start_time def __enter__(self): """Context manager entry""" self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" self.stop()