""" Simple rate limiter for API requests. """ import time from collections import defaultdict from typing import Dict class RateLimiter: """Simple rate limiter implementation""" def __init__(self, requests_per_minute: int = 100, burst_size: int = 20): self.requests_per_minute = requests_per_minute self.burst_size = burst_size self.requests: Dict[str, list] = defaultdict(list) def is_allowed(self, client_id: str) -> bool: """Check if request is allowed for client""" now = time.time() minute_ago = now - 60 # Clean old requests self.requests[client_id] = [ req_time for req_time in self.requests[client_id] if req_time > minute_ago ] # Check rate limit if len(self.requests[client_id]) >= self.requests_per_minute: return False # Add current request self.requests[client_id].append(now) return True def get_client_stats(self, client_id: str) -> Dict: """Get rate limiting stats for a specific client""" now = time.time() minute_ago = now - 60 # Clean old requests self.requests[client_id] = [ req_time for req_time in self.requests[client_id] if req_time > minute_ago ] current_requests = len(self.requests[client_id]) remaining_tokens = max(0, self.requests_per_minute - current_requests) # Calculate reset time (next minute boundary) reset_time = int(now) + (60 - int(now) % 60) return { 'client_id': client_id, 'current_requests': current_requests, 'remaining_tokens': remaining_tokens, 'requests_per_minute': self.requests_per_minute, 'reset_time': reset_time, 'window_start': minute_ago, 'window_end': now } def get_global_stats(self) -> Dict: """Get global rate limiting statistics""" now = time.time() minute_ago = now - 60 total_clients = len(self.requests) total_requests = 0 active_clients = 0 for client_id in list(self.requests.keys()): # Clean old requests self.requests[client_id] = [ req_time for req_time in self.requests[client_id] if req_time > minute_ago ] client_requests = len(self.requests[client_id]) total_requests += client_requests if client_requests > 0: active_clients += 1 # Remove clients with no recent requests if client_requests == 0: del self.requests[client_id] return { 'total_clients': total_clients, 'active_clients': active_clients, 'total_requests_last_minute': total_requests, 'requests_per_minute_limit': self.requests_per_minute, 'burst_size': self.burst_size, 'window_duration': 60 }