Files
gogo2/.kiro/specs/unified-data-storage/design.md
Dobromir Popov f464a412dc uni data storage
2025-10-20 09:48:59 +03:00

27 KiB

Design Document: Unified Data Storage System

Overview

This design document outlines the architecture for unifying all data storage and retrieval methods in the trading system. The current system uses multiple fragmented approaches (Parquet files, pickle files, in-memory caches, and TimescaleDB) which creates complexity and inconsistency. The unified system will consolidate these into a single, efficient TimescaleDB-based storage backend with a clean, unified API.

Key Design Principles

  1. Single Source of Truth: TimescaleDB as the primary storage backend for all time-series data
  2. Unified Interface: One method (get_inference_data()) for all data retrieval needs
  3. Performance First: In-memory caching for real-time data, optimized queries for historical data
  4. Backward Compatibility: Seamless migration from existing storage formats
  5. Separation of Concerns: Clear boundaries between storage, caching, and business logic

Architecture

High-Level Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Application Layer                         │
│  (Models, Backtesting, Annotation, Dashboard)               │
└────────────────────┬────────────────────────────────────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────────────────┐
│              Unified Data Provider API                       │
│                                                              │
│  get_inference_data(symbol, timestamp=None, context_window) │
│  get_multi_timeframe_data(symbol, timeframes, timestamp)    │
│  get_order_book_data(symbol, timestamp, aggregation)        │
└────────────────────┬────────────────────────────────────────┘
                     │
        ┌────────────┴────────────┐
        ▼                         ▼
┌──────────────────┐    ┌──────────────────┐
│  Cache Layer     │    │  Storage Layer   │
│  (In-Memory)     │    │  (TimescaleDB)   │
│                  │    │                  │
│  - Last 5 min    │    │  - OHLCV Data    │
│  - Real-time     │    │  - Order Book    │
│  - Low latency   │    │  - Trade Data    │
└──────────────────┘    │  - Aggregations  │
                        └──────────────────┘

Data Flow

Real-Time Data Flow:
WebSocket → Tick Aggregator → Cache Layer → TimescaleDB (async)
                                    ↓
                              Application (fast read)

Historical Data Flow:
Application → Unified API → TimescaleDB → Cache (optional) → Application

Components and Interfaces

1. Unified Data Provider

The central component that provides a single interface for all data access.

class UnifiedDataProvider:
    """
    Unified interface for all market data access.
    Handles both real-time and historical data retrieval.
    """
    
    def __init__(self, db_connection_pool, cache_manager):
        self.db = db_connection_pool
        self.cache = cache_manager
        self.symbols = ['ETH/USDT', 'BTC/USDT']
        self.timeframes = ['1s', '1m', '5m', '15m', '1h', '1d']
    
    async def get_inference_data(
        self,
        symbol: str,
        timestamp: Optional[datetime] = None,
        context_window_minutes: int = 5
    ) -> InferenceDataFrame:
        """
        Get complete inference data for a symbol at a specific time.
        
        Args:
            symbol: Trading symbol (e.g., 'ETH/USDT')
            timestamp: Target timestamp (None = latest real-time data)
            context_window_minutes: Minutes of context data before/after timestamp
            
        Returns:
            InferenceDataFrame with OHLCV, indicators, COB data, imbalances
        """
        
    async def get_multi_timeframe_data(
        self,
        symbol: str,
        timeframes: List[str],
        timestamp: Optional[datetime] = None,
        limit: int = 100
    ) -> Dict[str, pd.DataFrame]:
        """
        Get aligned multi-timeframe candlestick data.
        
        Args:
            symbol: Trading symbol
            timeframes: List of timeframes to retrieve
            timestamp: Target timestamp (None = latest)
            limit: Number of candles per timeframe
            
        Returns:
            Dictionary mapping timeframe to DataFrame
        """
    
    async def get_order_book_data(
        self,
        symbol: str,
        timestamp: Optional[datetime] = None,
        aggregation: str = '1s',
        limit: int = 300
    ) -> OrderBookDataFrame:
        """
        Get order book data with imbalance metrics.
        
        Args:
            symbol: Trading symbol
            timestamp: Target timestamp (None = latest)
            aggregation: Aggregation level ('raw', '1s', '1m')
            limit: Number of data points
            
        Returns:
            OrderBookDataFrame with bids, asks, imbalances
        """

2. Storage Layer (TimescaleDB)

TimescaleDB schema and access patterns.

Database Schema

-- OHLCV Data (Hypertable)
CREATE TABLE ohlcv_data (
    timestamp TIMESTAMPTZ NOT NULL,
    symbol VARCHAR(20) NOT NULL,
    timeframe VARCHAR(10) NOT NULL,
    open_price DECIMAL(20,8) NOT NULL,
    high_price DECIMAL(20,8) NOT NULL,
    low_price DECIMAL(20,8) NOT NULL,
    close_price DECIMAL(20,8) NOT NULL,
    volume DECIMAL(30,8) NOT NULL,
    trade_count INTEGER,
    -- Technical Indicators (pre-calculated)
    rsi_14 DECIMAL(10,4),
    macd DECIMAL(20,8),
    macd_signal DECIMAL(20,8),
    bb_upper DECIMAL(20,8),
    bb_middle DECIMAL(20,8),
    bb_lower DECIMAL(20,8),
    PRIMARY KEY (timestamp, symbol, timeframe)
);

SELECT create_hypertable('ohlcv_data', 'timestamp');
CREATE INDEX idx_ohlcv_symbol_tf ON ohlcv_data (symbol, timeframe, timestamp DESC);

-- Order Book Snapshots (Hypertable)
CREATE TABLE order_book_snapshots (
    timestamp TIMESTAMPTZ NOT NULL,
    symbol VARCHAR(20) NOT NULL,
    exchange VARCHAR(20) NOT NULL,
    bids JSONB NOT NULL,  -- Top 50 levels
    asks JSONB NOT NULL,  -- Top 50 levels
    mid_price DECIMAL(20,8),
    spread DECIMAL(20,8),
    bid_volume DECIMAL(30,8),
    ask_volume DECIMAL(30,8),
    PRIMARY KEY (timestamp, symbol, exchange)
);

SELECT create_hypertable('order_book_snapshots', 'timestamp');
CREATE INDEX idx_obs_symbol ON order_book_snapshots (symbol, timestamp DESC);

-- Order Book Aggregated 1s (Hypertable)
CREATE TABLE order_book_1s_agg (
    timestamp TIMESTAMPTZ NOT NULL,
    symbol VARCHAR(20) NOT NULL,
    price_bucket DECIMAL(20,2) NOT NULL,  -- $1 buckets
    bid_volume DECIMAL(30,8),
    ask_volume DECIMAL(30,8),
    bid_count INTEGER,
    ask_count INTEGER,
    imbalance DECIMAL(10,6),
    PRIMARY KEY (timestamp, symbol, price_bucket)
);

SELECT create_hypertable('order_book_1s_agg', 'timestamp');
CREATE INDEX idx_ob1s_symbol ON order_book_1s_agg (symbol, timestamp DESC);

-- Order Book Imbalances (Hypertable)
CREATE TABLE order_book_imbalances (
    timestamp TIMESTAMPTZ NOT NULL,
    symbol VARCHAR(20) NOT NULL,
    imbalance_1s DECIMAL(10,6),
    imbalance_5s DECIMAL(10,6),
    imbalance_15s DECIMAL(10,6),
    imbalance_60s DECIMAL(10,6),
    volume_imbalance_1s DECIMAL(10,6),
    volume_imbalance_5s DECIMAL(10,6),
    volume_imbalance_15s DECIMAL(10,6),
    volume_imbalance_60s DECIMAL(10,6),
    price_range DECIMAL(10,2),
    PRIMARY KEY (timestamp, symbol)
);

SELECT create_hypertable('order_book_imbalances', 'timestamp');
CREATE INDEX idx_obi_symbol ON order_book_imbalances (symbol, timestamp DESC);

-- Trade Events (Hypertable)
CREATE TABLE trade_events (
    timestamp TIMESTAMPTZ NOT NULL,
    symbol VARCHAR(20) NOT NULL,
    exchange VARCHAR(20) NOT NULL,
    price DECIMAL(20,8) NOT NULL,
    size DECIMAL(30,8) NOT NULL,
    side VARCHAR(4) NOT NULL,
    trade_id VARCHAR(100) NOT NULL,
    PRIMARY KEY (timestamp, symbol, exchange, trade_id)
);

SELECT create_hypertable('trade_events', 'timestamp');
CREATE INDEX idx_trades_symbol ON trade_events (symbol, timestamp DESC);

Continuous Aggregates

-- 1m OHLCV from 1s data
CREATE MATERIALIZED VIEW ohlcv_1m_continuous
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('1 minute', timestamp) AS timestamp,
    symbol,
    '1m' AS timeframe,
    first(open_price, timestamp) AS open_price,
    max(high_price) AS high_price,
    min(low_price) AS low_price,
    last(close_price, timestamp) AS close_price,
    sum(volume) AS volume,
    sum(trade_count) AS trade_count
FROM ohlcv_data
WHERE timeframe = '1s'
GROUP BY time_bucket('1 minute', timestamp), symbol;

-- 5m OHLCV from 1m data
CREATE MATERIALIZED VIEW ohlcv_5m_continuous
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('5 minutes', timestamp) AS timestamp,
    symbol,
    '5m' AS timeframe,
    first(open_price, timestamp) AS open_price,
    max(high_price) AS high_price,
    min(low_price) AS low_price,
    last(close_price, timestamp) AS close_price,
    sum(volume) AS volume,
    sum(trade_count) AS trade_count
FROM ohlcv_data
WHERE timeframe = '1m'
GROUP BY time_bucket('5 minutes', timestamp), symbol;

-- Similar for 15m, 1h, 1d

Compression Policies

-- Compress data older than 7 days
SELECT add_compression_policy('ohlcv_data', INTERVAL '7 days');
SELECT add_compression_policy('order_book_snapshots', INTERVAL '1 day');
SELECT add_compression_policy('order_book_1s_agg', INTERVAL '2 days');
SELECT add_compression_policy('order_book_imbalances', INTERVAL '2 days');
SELECT add_compression_policy('trade_events', INTERVAL '7 days');

Retention Policies

-- Retain data for specified periods
SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days');
SELECT add_retention_policy('order_book_1s_agg', INTERVAL '60 days');
SELECT add_retention_policy('order_book_imbalances', INTERVAL '60 days');
SELECT add_retention_policy('trade_events', INTERVAL '90 days');
SELECT add_retention_policy('ohlcv_data', INTERVAL '2 years');

3. Cache Layer

In-memory caching for low-latency real-time data access.

class DataCacheManager:
    """
    Manages in-memory cache for real-time data.
    Provides <10ms latency for latest data access.
    """
    
    def __init__(self, cache_duration_seconds: int = 300):
        # Cache last 5 minutes of data
        self.cache_duration = cache_duration_seconds
        
        # In-memory storage
        self.ohlcv_cache: Dict[str, Dict[str, deque]] = {}
        self.orderbook_cache: Dict[str, deque] = {}
        self.imbalance_cache: Dict[str, deque] = {}
        self.trade_cache: Dict[str, deque] = {}
        
        # Cache statistics
        self.cache_hits = 0
        self.cache_misses = 0
        
    def add_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
        """Add OHLCV candle to cache"""
        
    def add_orderbook_snapshot(self, symbol: str, snapshot: Dict):
        """Add order book snapshot to cache"""
        
    def add_imbalance_data(self, symbol: str, imbalance: Dict):
        """Add imbalance metrics to cache"""
        
    def get_latest_ohlcv(self, symbol: str, timeframe: str, limit: int = 100) -> List[Dict]:
        """Get latest OHLCV candles from cache"""
        
    def get_latest_orderbook(self, symbol: str) -> Optional[Dict]:
        """Get latest order book snapshot from cache"""
        
    def get_latest_imbalances(self, symbol: str, limit: int = 60) -> List[Dict]:
        """Get latest imbalance metrics from cache"""
        
    def evict_old_data(self):
        """Remove data older than cache duration"""

4. Data Models

Standardized data structures for all components.

@dataclass
class InferenceDataFrame:
    """Complete inference data for a single timestamp"""
    symbol: str
    timestamp: datetime
    
    # Multi-timeframe OHLCV
    ohlcv_1s: pd.DataFrame
    ohlcv_1m: pd.DataFrame
    ohlcv_5m: pd.DataFrame
    ohlcv_15m: pd.DataFrame
    ohlcv_1h: pd.DataFrame
    ohlcv_1d: pd.DataFrame
    
    # Order book data
    orderbook_snapshot: Optional[Dict]
    orderbook_1s_agg: pd.DataFrame
    
    # Imbalance metrics
    imbalances: pd.DataFrame  # Multi-timeframe imbalances
    
    # Technical indicators (pre-calculated)
    indicators: Dict[str, float]
    
    # Context window data (±N minutes)
    context_data: Optional[pd.DataFrame]
    
    # Metadata
    data_source: str  # 'cache' or 'database'
    query_latency_ms: float

@dataclass
class OrderBookDataFrame:
    """Order book data with imbalances"""
    symbol: str
    timestamp: datetime
    
    # Raw order book
    bids: List[Tuple[float, float]]  # (price, size)
    asks: List[Tuple[float, float]]
    
    # Aggregated data
    price_buckets: pd.DataFrame  # $1 buckets
    
    # Imbalance metrics
    imbalance_1s: float
    imbalance_5s: float
    imbalance_15s: float
    imbalance_60s: float
    
    # Volume-weighted imbalances
    volume_imbalance_1s: float
    volume_imbalance_5s: float
    volume_imbalance_15s: float
    volume_imbalance_60s: float
    
    # Statistics
    mid_price: float
    spread: float
    bid_volume: float
    ask_volume: float

5. Data Ingestion Pipeline

Real-time data ingestion with async persistence.

class DataIngestionPipeline:
    """
    Handles real-time data ingestion from WebSocket sources.
    Writes to cache immediately, persists to DB asynchronously.
    """
    
    def __init__(self, cache_manager, db_connection_pool):
        self.cache = cache_manager
        self.db = db_connection_pool
        
        # Batch write buffers
        self.ohlcv_buffer: List[Dict] = []
        self.orderbook_buffer: List[Dict] = []
        self.trade_buffer: List[Dict] = []
        
        # Batch write settings
        self.batch_size = 100
        self.batch_timeout_seconds = 5
        
    async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
        """
        Ingest OHLCV candle.
        1. Add to cache immediately
        2. Buffer for batch write to DB
        """
        # Immediate cache write
        self.cache.add_ohlcv_candle(symbol, timeframe, candle)
        
        # Buffer for DB write
        self.ohlcv_buffer.append({
            'symbol': symbol,
            'timeframe': timeframe,
            **candle
        })
        
        # Flush if buffer full
        if len(self.ohlcv_buffer) >= self.batch_size:
            await self._flush_ohlcv_buffer()
    
    async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict):
        """Ingest order book snapshot"""
        # Immediate cache write
        self.cache.add_orderbook_snapshot(symbol, snapshot)
        
        # Calculate and cache imbalances
        imbalances = self._calculate_imbalances(symbol, snapshot)
        self.cache.add_imbalance_data(symbol, imbalances)
        
        # Buffer for DB write
        self.orderbook_buffer.append({
            'symbol': symbol,
            **snapshot
        })
        
        # Flush if buffer full
        if len(self.orderbook_buffer) >= self.batch_size:
            await self._flush_orderbook_buffer()
    
    async def _flush_ohlcv_buffer(self):
        """Batch write OHLCV data to database"""
        if not self.ohlcv_buffer:
            return
        
        try:
            # Prepare batch insert
            values = [
                (
                    item['timestamp'],
                    item['symbol'],
                    item['timeframe'],
                    item['open'],
                    item['high'],
                    item['low'],
                    item['close'],
                    item['volume'],
                    item.get('trade_count', 0)
                )
                for item in self.ohlcv_buffer
            ]
            
            # Batch insert
            await self.db.executemany(
                """
                INSERT INTO ohlcv_data 
                (timestamp, symbol, timeframe, open_price, high_price, 
                 low_price, close_price, volume, trade_count)
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
                ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE
                SET close_price = EXCLUDED.close_price,
                    high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price),
                    low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price),
                    volume = ohlcv_data.volume + EXCLUDED.volume,
                    trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count
                """,
                values
            )
            
            # Clear buffer
            self.ohlcv_buffer.clear()
            
        except Exception as e:
            logger.error(f"Error flushing OHLCV buffer: {e}")

6. Migration System

Migrate existing Parquet/pickle data to TimescaleDB.

class DataMigrationManager:
    """
    Migrates existing data from Parquet/pickle files to TimescaleDB.
    Ensures data integrity and provides rollback capability.
    """
    
    def __init__(self, db_connection_pool, cache_dir: Path):
        self.db = db_connection_pool
        self.cache_dir = cache_dir
        
    async def migrate_all_data(self):
        """Migrate all existing data to TimescaleDB"""
        logger.info("Starting data migration to TimescaleDB")
        
        # Migrate OHLCV data from Parquet files
        await self._migrate_ohlcv_data()
        
        # Migrate order book data if exists
        await self._migrate_orderbook_data()
        
        # Verify migration
        await self._verify_migration()
        
        logger.info("Data migration completed successfully")
    
    async def _migrate_ohlcv_data(self):
        """Migrate OHLCV data from Parquet files"""
        parquet_files = list(self.cache_dir.glob("*.parquet"))
        
        for parquet_file in parquet_files:
            try:
                # Parse filename: ETHUSDT_1m.parquet
                filename = parquet_file.stem
                parts = filename.split('_')
                
                if len(parts) != 2:
                    continue
                
                symbol_raw = parts[0]
                timeframe = parts[1]
                
                # Convert symbol format
                symbol = self._convert_symbol_format(symbol_raw)
                
                # Read Parquet file
                df = pd.read_parquet(parquet_file)
                
                # Migrate data in batches
                await self._migrate_ohlcv_batch(symbol, timeframe, df)
                
                logger.info(f"Migrated {len(df)} rows from {parquet_file.name}")
                
            except Exception as e:
                logger.error(f"Error migrating {parquet_file}: {e}")
    
    async def _migrate_ohlcv_batch(self, symbol: str, timeframe: str, df: pd.DataFrame):
        """Migrate a batch of OHLCV data"""
        # Prepare data for insertion
        values = []
        for idx, row in df.iterrows():
            values.append((
                row['timestamp'],
                symbol,
                timeframe,
                row['open'],
                row['high'],
                row['low'],
                row['close'],
                row['volume'],
                row.get('trade_count', 0)
            ))
        
        # Batch insert
        await self.db.executemany(
            """
            INSERT INTO ohlcv_data 
            (timestamp, symbol, timeframe, open_price, high_price, 
             low_price, close_price, volume, trade_count)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
            ON CONFLICT (timestamp, symbol, timeframe) DO NOTHING
            """,
            values
        )

Error Handling

Data Validation

class DataValidator:
    """Validates all incoming data before storage"""
    
    @staticmethod
    def validate_ohlcv(candle: Dict) -> bool:
        """Validate OHLCV candle data"""
        try:
            # Check required fields
            required = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
            if not all(field in candle for field in required):
                return False
            
            # Validate OHLC relationships
            if candle['high'] < candle['low']:
                logger.warning(f"Invalid OHLCV: high < low")
                return False
            
            if candle['high'] < candle['open'] or candle['high'] < candle['close']:
                logger.warning(f"Invalid OHLCV: high < open/close")
                return False
            
            if candle['low'] > candle['open'] or candle['low'] > candle['close']:
                logger.warning(f"Invalid OHLCV: low > open/close")
                return False
            
            # Validate positive volume
            if candle['volume'] < 0:
                logger.warning(f"Invalid OHLCV: negative volume")
                return False
            
            return True
            
        except Exception as e:
            logger.error(f"Error validating OHLCV: {e}")
            return False
    
    @staticmethod
    def validate_orderbook(orderbook: Dict) -> bool:
        """Validate order book data"""
        try:
            # Check required fields
            if 'bids' not in orderbook or 'asks' not in orderbook:
                return False
            
            # Validate bid/ask relationship
            if orderbook['bids'] and orderbook['asks']:
                best_bid = max(bid[0] for bid in orderbook['bids'])
                best_ask = min(ask[0] for ask in orderbook['asks'])
                
                if best_bid >= best_ask:
                    logger.warning(f"Invalid orderbook: bid >= ask")
                    return False
            
            return True
            
        except Exception as e:
            logger.error(f"Error validating orderbook: {e}")
            return False

Retry Logic

class RetryableDBOperation:
    """Wrapper for database operations with retry logic"""
    
    @staticmethod
    async def execute_with_retry(
        operation: Callable,
        max_retries: int = 3,
        backoff_seconds: float = 1.0
    ):
        """Execute database operation with exponential backoff retry"""
        for attempt in range(max_retries):
            try:
                return await operation()
            except Exception as e:
                if attempt == max_retries - 1:
                    logger.error(f"Operation failed after {max_retries} attempts: {e}")
                    raise
                
                wait_time = backoff_seconds * (2 ** attempt)
                logger.warning(f"Operation failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}")
                await asyncio.sleep(wait_time)

Testing Strategy

Unit Tests

  1. Data Validation Tests

    • Test OHLCV validation logic
    • Test order book validation logic
    • Test timestamp validation and timezone handling
  2. Cache Manager Tests

    • Test cache insertion and retrieval
    • Test cache eviction logic
    • Test cache hit/miss statistics
  3. Data Model Tests

    • Test InferenceDataFrame creation
    • Test OrderBookDataFrame creation
    • Test data serialization/deserialization

Integration Tests

  1. Database Integration Tests

    • Test TimescaleDB connection and queries
    • Test batch insert operations
    • Test continuous aggregates
    • Test compression and retention policies
  2. End-to-End Data Flow Tests

    • Test real-time data ingestion → cache → database
    • Test historical data retrieval from database
    • Test multi-timeframe data alignment
  3. Migration Tests

    • Test Parquet file migration
    • Test data integrity after migration
    • Test rollback capability

Performance Tests

  1. Latency Tests

    • Cache read latency (<10ms target)
    • Database query latency (<100ms target)
    • Batch write throughput (>1000 ops/sec target)
  2. Load Tests

    • Concurrent read/write operations
    • High-frequency data ingestion
    • Large time-range queries
  3. Storage Tests

    • Compression ratio validation (>80% target)
    • Storage growth over time
    • Query performance with compressed data

Performance Optimization

Query Optimization

-- Use time_bucket for efficient time-range queries
SELECT 
    time_bucket('1 minute', timestamp) AS bucket,
    symbol,
    first(close_price, timestamp) AS price
FROM ohlcv_data
WHERE symbol = 'ETH/USDT'
  AND timeframe = '1s'
  AND timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY bucket, symbol
ORDER BY bucket DESC;

-- Use indexes for symbol-based queries
CREATE INDEX CONCURRENTLY idx_ohlcv_symbol_tf_ts 
ON ohlcv_data (symbol, timeframe, timestamp DESC);

Caching Strategy

  1. Hot Data: Last 5 minutes in memory (all symbols, all timeframes)
  2. Warm Data: Last 1 hour in TimescaleDB uncompressed
  3. Cold Data: Older than 1 hour in TimescaleDB compressed

Batch Operations

  • Batch size: 100 records or 5 seconds (whichever comes first)
  • Use executemany() for bulk inserts
  • Use COPY command for large migrations

Deployment Considerations

Database Setup

  1. Install TimescaleDB extension
  2. Run schema creation scripts
  3. Create hypertables and indexes
  4. Set up continuous aggregates
  5. Configure compression and retention policies

Migration Process

  1. Phase 1: Deploy new code with dual-write (Parquet + TimescaleDB)
  2. Phase 2: Run migration script to backfill historical data
  3. Phase 3: Verify data integrity
  4. Phase 4: Switch reads to TimescaleDB
  5. Phase 5: Deprecate Parquet writes
  6. Phase 6: Archive old Parquet files

Monitoring

  1. Database Metrics

    • Query latency (p50, p95, p99)
    • Write throughput
    • Storage size and compression ratio
    • Connection pool utilization
  2. Cache Metrics

    • Hit/miss ratio
    • Cache size
    • Eviction rate
  3. Application Metrics

    • Data retrieval latency
    • Error rates
    • Data validation failures

Security Considerations

  1. Database Access

    • Use connection pooling with proper credentials
    • Implement read-only users for query-only operations
    • Use SSL/TLS for database connections
  2. Data Validation

    • Validate all incoming data before storage
    • Sanitize inputs to prevent SQL injection
    • Implement rate limiting for API endpoints
  3. Backup and Recovery

    • Regular database backups (daily)
    • Point-in-time recovery capability
    • Disaster recovery plan

Future Enhancements

  1. Multi-Exchange Support

    • Store data from multiple exchanges
    • Cross-exchange arbitrage analysis
    • Exchange-specific data normalization
  2. Advanced Analytics

    • Real-time pattern detection
    • Anomaly detection
    • Predictive analytics
  3. Distributed Storage

    • Horizontal scaling with TimescaleDB clustering
    • Read replicas for query load distribution
    • Geographic distribution for low-latency access