""" Database connection pool management for TimescaleDB. """ import asyncio import asyncpg from typing import Optional, Dict, Any from contextlib import asynccontextmanager from ..config import config from ..utils.logging import get_logger from ..utils.exceptions import StorageError logger = get_logger(__name__) class DatabaseConnectionPool: """Manages database connection pool for TimescaleDB""" def __init__(self): self._pool: Optional[asyncpg.Pool] = None self._is_initialized = False async def initialize(self) -> None: """Initialize the connection pool""" if self._is_initialized: return try: # Build connection string dsn = ( f"postgresql://{config.database.user}:{config.database.password}" f"@{config.database.host}:{config.database.port}/{config.database.name}" ) # Create connection pool self._pool = await asyncpg.create_pool( dsn, min_size=5, max_size=config.database.pool_size, max_queries=50000, max_inactive_connection_lifetime=300, command_timeout=config.database.pool_timeout, server_settings={ 'search_path': config.database.schema, 'timezone': 'UTC' } ) self._is_initialized = True logger.info(f"Database connection pool initialized with {config.database.pool_size} connections") # Test connection await self.health_check() except Exception as e: logger.error(f"Failed to initialize database connection pool: {e}") raise StorageError(f"Database connection failed: {e}", "DB_INIT_ERROR") async def close(self) -> None: """Close the connection pool""" if self._pool: await self._pool.close() self._pool = None self._is_initialized = False logger.info("Database connection pool closed") @asynccontextmanager async def get_connection(self): """Get a database connection from the pool""" if not self._is_initialized: await self.initialize() if not self._pool: raise StorageError("Connection pool not initialized", "POOL_NOT_READY") async with self._pool.acquire() as connection: try: yield connection except Exception as e: logger.error(f"Database operation failed: {e}") raise @asynccontextmanager async def get_transaction(self): """Get a database transaction""" async with self.get_connection() as conn: async with conn.transaction(): yield conn async def execute_query(self, query: str, *args) -> Any: """Execute a query and return results""" async with self.get_connection() as conn: return await conn.fetch(query, *args) async def execute_command(self, command: str, *args) -> str: """Execute a command and return status""" async with self.get_connection() as conn: return await conn.execute(command, *args) async def execute_many(self, command: str, args_list) -> None: """Execute a command multiple times with different arguments""" async with self.get_connection() as conn: await conn.executemany(command, args_list) async def health_check(self) -> bool: """Check database health""" try: async with self.get_connection() as conn: result = await conn.fetchval("SELECT 1") if result == 1: logger.debug("Database health check passed") return True else: logger.warning("Database health check returned unexpected result") return False except Exception as e: logger.error(f"Database health check failed: {e}") return False async def get_pool_stats(self) -> Dict[str, Any]: """Get connection pool statistics""" if not self._pool: return {} return { 'size': self._pool.get_size(), 'min_size': self._pool.get_min_size(), 'max_size': self._pool.get_max_size(), 'idle_size': self._pool.get_idle_size(), 'is_closing': self._pool.is_closing() } @property def is_initialized(self) -> bool: """Check if pool is initialized""" return self._is_initialized # Global connection pool instance db_pool = DatabaseConnectionPool()