206 lines
4.7 KiB
Python
206 lines
4.7 KiB
Python
"""
|
|
Timing utilities for the COBY system.
|
|
"""
|
|
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
|
|
def get_current_timestamp() -> datetime:
|
|
"""
|
|
Get current UTC timestamp.
|
|
|
|
Returns:
|
|
datetime: Current UTC timestamp
|
|
"""
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
def format_timestamp(timestamp: datetime, format_str: str = "%Y-%m-%d %H:%M:%S.%f") -> str:
|
|
"""
|
|
Format timestamp to string.
|
|
|
|
Args:
|
|
timestamp: Timestamp to format
|
|
format_str: Format string
|
|
|
|
Returns:
|
|
str: Formatted timestamp string
|
|
"""
|
|
return timestamp.strftime(format_str)
|
|
|
|
|
|
def parse_timestamp(timestamp_str: str, format_str: str = "%Y-%m-%d %H:%M:%S.%f") -> datetime:
|
|
"""
|
|
Parse timestamp string to datetime.
|
|
|
|
Args:
|
|
timestamp_str: Timestamp string to parse
|
|
format_str: Format string
|
|
|
|
Returns:
|
|
datetime: Parsed timestamp
|
|
"""
|
|
dt = datetime.strptime(timestamp_str, format_str)
|
|
# Ensure timezone awareness
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
|
|
|
|
def timestamp_to_unix(timestamp: datetime) -> float:
|
|
"""
|
|
Convert datetime to Unix timestamp.
|
|
|
|
Args:
|
|
timestamp: Datetime to convert
|
|
|
|
Returns:
|
|
float: Unix timestamp
|
|
"""
|
|
return timestamp.timestamp()
|
|
|
|
|
|
def unix_to_timestamp(unix_time: float) -> datetime:
|
|
"""
|
|
Convert Unix timestamp to datetime.
|
|
|
|
Args:
|
|
unix_time: Unix timestamp
|
|
|
|
Returns:
|
|
datetime: Converted datetime (UTC)
|
|
"""
|
|
return datetime.fromtimestamp(unix_time, tz=timezone.utc)
|
|
|
|
|
|
def calculate_time_diff(start: datetime, end: datetime) -> float:
|
|
"""
|
|
Calculate time difference in seconds.
|
|
|
|
Args:
|
|
start: Start timestamp
|
|
end: End timestamp
|
|
|
|
Returns:
|
|
float: Time difference in seconds
|
|
"""
|
|
return (end - start).total_seconds()
|
|
|
|
|
|
def is_timestamp_recent(timestamp: datetime, max_age_seconds: int = 60) -> bool:
|
|
"""
|
|
Check if timestamp is recent (within max_age_seconds).
|
|
|
|
Args:
|
|
timestamp: Timestamp to check
|
|
max_age_seconds: Maximum age in seconds
|
|
|
|
Returns:
|
|
bool: True if recent, False otherwise
|
|
"""
|
|
now = get_current_timestamp()
|
|
age = calculate_time_diff(timestamp, now)
|
|
return age <= max_age_seconds
|
|
|
|
|
|
def sleep_until(target_time: datetime) -> None:
|
|
"""
|
|
Sleep until target time.
|
|
|
|
Args:
|
|
target_time: Target timestamp to sleep until
|
|
"""
|
|
now = get_current_timestamp()
|
|
sleep_seconds = calculate_time_diff(now, target_time)
|
|
|
|
if sleep_seconds > 0:
|
|
time.sleep(sleep_seconds)
|
|
|
|
|
|
def get_milliseconds() -> int:
|
|
"""
|
|
Get current timestamp in milliseconds.
|
|
|
|
Returns:
|
|
int: Current timestamp in milliseconds
|
|
"""
|
|
return int(time.time() * 1000)
|
|
|
|
|
|
def milliseconds_to_timestamp(ms: int) -> datetime:
|
|
"""
|
|
Convert milliseconds to datetime.
|
|
|
|
Args:
|
|
ms: Milliseconds timestamp
|
|
|
|
Returns:
|
|
datetime: Converted datetime (UTC)
|
|
"""
|
|
return datetime.fromtimestamp(ms / 1000.0, tz=timezone.utc)
|
|
|
|
|
|
def round_timestamp(timestamp: datetime, seconds: int) -> datetime:
|
|
"""
|
|
Round timestamp to nearest interval.
|
|
|
|
Args:
|
|
timestamp: Timestamp to round
|
|
seconds: Interval in seconds
|
|
|
|
Returns:
|
|
datetime: Rounded timestamp
|
|
"""
|
|
unix_time = timestamp_to_unix(timestamp)
|
|
rounded_unix = round(unix_time / seconds) * seconds
|
|
return unix_to_timestamp(rounded_unix)
|
|
|
|
|
|
class Timer:
|
|
"""Simple timer for measuring execution time"""
|
|
|
|
def __init__(self):
|
|
self.start_time: Optional[float] = None
|
|
self.end_time: Optional[float] = None
|
|
|
|
def start(self) -> None:
|
|
"""Start the timer"""
|
|
self.start_time = time.perf_counter()
|
|
self.end_time = None
|
|
|
|
def stop(self) -> float:
|
|
"""
|
|
Stop the timer and return elapsed time.
|
|
|
|
Returns:
|
|
float: Elapsed time in seconds
|
|
"""
|
|
if self.start_time is None:
|
|
raise ValueError("Timer not started")
|
|
|
|
self.end_time = time.perf_counter()
|
|
return self.elapsed()
|
|
|
|
def elapsed(self) -> float:
|
|
"""
|
|
Get elapsed time.
|
|
|
|
Returns:
|
|
float: Elapsed time in seconds
|
|
"""
|
|
if self.start_time is None:
|
|
return 0.0
|
|
|
|
end = self.end_time or time.perf_counter()
|
|
return end - self.start_time
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry"""
|
|
self.start()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit"""
|
|
self.stop() |