# Design Document: Unified Data Storage System ## Overview This design document outlines the architecture for unifying all data storage and retrieval methods in the trading system. The current system uses multiple fragmented approaches (Parquet files, pickle files, in-memory caches, and TimescaleDB) which creates complexity and inconsistency. The unified system will consolidate these into a single, efficient TimescaleDB-based storage backend with a clean, unified API. ### Key Design Principles 1. **Single Source of Truth**: TimescaleDB as the primary storage backend for all time-series data 2. **Unified Interface**: One method (`get_inference_data()`) for all data retrieval needs 3. **Performance First**: In-memory caching for real-time data, optimized queries for historical data 4. **Backward Compatibility**: Seamless migration from existing storage formats 5. **Separation of Concerns**: Clear boundaries between storage, caching, and business logic ## Architecture ### High-Level Architecture ``` ┌─────────────────────────────────────────────────────────────┐ │ Application Layer │ │ (Models, Backtesting, Annotation, Dashboard) │ └────────────────────┬────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ Unified Data Provider API │ │ │ │ get_inference_data(symbol, timestamp=None, context_window) │ │ get_multi_timeframe_data(symbol, timeframes, timestamp) │ │ get_order_book_data(symbol, timestamp, aggregation) │ └────────────────────┬────────────────────────────────────────┘ │ ┌────────────┴────────────┐ ▼ ▼ ┌──────────────────┐ ┌──────────────────┐ │ Cache Layer │ │ Storage Layer │ │ (In-Memory) │ │ (TimescaleDB) │ │ │ │ │ │ - Last 5 min │ │ - OHLCV Data │ │ - Real-time │ │ - Order Book │ │ - Low latency │ │ - Trade Data │ └──────────────────┘ │ - Aggregations │ └──────────────────┘ ``` ### Data Flow ``` Real-Time Data Flow: WebSocket → Tick Aggregator → Cache Layer → TimescaleDB (async) ↓ Application (fast read) Historical Data Flow: Application → Unified API → TimescaleDB → Cache (optional) → Application ``` ## Components and Interfaces ### 1. Unified Data Provider The central component that provides a single interface for all data access. ```python class UnifiedDataProvider: """ Unified interface for all market data access. Handles both real-time and historical data retrieval. """ def __init__(self, db_connection_pool, cache_manager): self.db = db_connection_pool self.cache = cache_manager self.symbols = ['ETH/USDT', 'BTC/USDT'] self.timeframes = ['1s', '1m', '5m', '15m', '1h', '1d'] async def get_inference_data( self, symbol: str, timestamp: Optional[datetime] = None, context_window_minutes: int = 5 ) -> InferenceDataFrame: """ Get complete inference data for a symbol at a specific time. Args: symbol: Trading symbol (e.g., 'ETH/USDT') timestamp: Target timestamp (None = latest real-time data) context_window_minutes: Minutes of context data before/after timestamp Returns: InferenceDataFrame with OHLCV, indicators, COB data, imbalances """ async def get_multi_timeframe_data( self, symbol: str, timeframes: List[str], timestamp: Optional[datetime] = None, limit: int = 100 ) -> Dict[str, pd.DataFrame]: """ Get aligned multi-timeframe candlestick data. Args: symbol: Trading symbol timeframes: List of timeframes to retrieve timestamp: Target timestamp (None = latest) limit: Number of candles per timeframe Returns: Dictionary mapping timeframe to DataFrame """ async def get_order_book_data( self, symbol: str, timestamp: Optional[datetime] = None, aggregation: str = '1s', limit: int = 300 ) -> OrderBookDataFrame: """ Get order book data with imbalance metrics. Args: symbol: Trading symbol timestamp: Target timestamp (None = latest) aggregation: Aggregation level ('raw', '1s', '1m') limit: Number of data points Returns: OrderBookDataFrame with bids, asks, imbalances """ ``` ### 2. Storage Layer (TimescaleDB) TimescaleDB schema and access patterns. #### Database Schema ```sql -- OHLCV Data (Hypertable) CREATE TABLE ohlcv_data ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, timeframe VARCHAR(10) NOT NULL, open_price DECIMAL(20,8) NOT NULL, high_price DECIMAL(20,8) NOT NULL, low_price DECIMAL(20,8) NOT NULL, close_price DECIMAL(20,8) NOT NULL, volume DECIMAL(30,8) NOT NULL, trade_count INTEGER, -- Technical Indicators (pre-calculated) rsi_14 DECIMAL(10,4), macd DECIMAL(20,8), macd_signal DECIMAL(20,8), bb_upper DECIMAL(20,8), bb_middle DECIMAL(20,8), bb_lower DECIMAL(20,8), PRIMARY KEY (timestamp, symbol, timeframe) ); SELECT create_hypertable('ohlcv_data', 'timestamp'); CREATE INDEX idx_ohlcv_symbol_tf ON ohlcv_data (symbol, timeframe, timestamp DESC); -- Order Book Snapshots (Hypertable) CREATE TABLE order_book_snapshots ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL, bids JSONB NOT NULL, -- Top 50 levels asks JSONB NOT NULL, -- Top 50 levels mid_price DECIMAL(20,8), spread DECIMAL(20,8), bid_volume DECIMAL(30,8), ask_volume DECIMAL(30,8), PRIMARY KEY (timestamp, symbol, exchange) ); SELECT create_hypertable('order_book_snapshots', 'timestamp'); CREATE INDEX idx_obs_symbol ON order_book_snapshots (symbol, timestamp DESC); -- Order Book Aggregated 1s (Hypertable) CREATE TABLE order_book_1s_agg ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, price_bucket DECIMAL(20,2) NOT NULL, -- $1 buckets bid_volume DECIMAL(30,8), ask_volume DECIMAL(30,8), bid_count INTEGER, ask_count INTEGER, imbalance DECIMAL(10,6), PRIMARY KEY (timestamp, symbol, price_bucket) ); SELECT create_hypertable('order_book_1s_agg', 'timestamp'); CREATE INDEX idx_ob1s_symbol ON order_book_1s_agg (symbol, timestamp DESC); -- Order Book Imbalances (Hypertable) CREATE TABLE order_book_imbalances ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, imbalance_1s DECIMAL(10,6), imbalance_5s DECIMAL(10,6), imbalance_15s DECIMAL(10,6), imbalance_60s DECIMAL(10,6), volume_imbalance_1s DECIMAL(10,6), volume_imbalance_5s DECIMAL(10,6), volume_imbalance_15s DECIMAL(10,6), volume_imbalance_60s DECIMAL(10,6), price_range DECIMAL(10,2), PRIMARY KEY (timestamp, symbol) ); SELECT create_hypertable('order_book_imbalances', 'timestamp'); CREATE INDEX idx_obi_symbol ON order_book_imbalances (symbol, timestamp DESC); -- Trade Events (Hypertable) CREATE TABLE trade_events ( timestamp TIMESTAMPTZ NOT NULL, symbol VARCHAR(20) NOT NULL, exchange VARCHAR(20) NOT NULL, price DECIMAL(20,8) NOT NULL, size DECIMAL(30,8) NOT NULL, side VARCHAR(4) NOT NULL, trade_id VARCHAR(100) NOT NULL, PRIMARY KEY (timestamp, symbol, exchange, trade_id) ); SELECT create_hypertable('trade_events', 'timestamp'); CREATE INDEX idx_trades_symbol ON trade_events (symbol, timestamp DESC); ``` #### Continuous Aggregates ```sql -- 1m OHLCV from 1s data CREATE MATERIALIZED VIEW ohlcv_1m_continuous WITH (timescaledb.continuous) AS SELECT time_bucket('1 minute', timestamp) AS timestamp, symbol, '1m' AS timeframe, first(open_price, timestamp) AS open_price, max(high_price) AS high_price, min(low_price) AS low_price, last(close_price, timestamp) AS close_price, sum(volume) AS volume, sum(trade_count) AS trade_count FROM ohlcv_data WHERE timeframe = '1s' GROUP BY time_bucket('1 minute', timestamp), symbol; -- 5m OHLCV from 1m data CREATE MATERIALIZED VIEW ohlcv_5m_continuous WITH (timescaledb.continuous) AS SELECT time_bucket('5 minutes', timestamp) AS timestamp, symbol, '5m' AS timeframe, first(open_price, timestamp) AS open_price, max(high_price) AS high_price, min(low_price) AS low_price, last(close_price, timestamp) AS close_price, sum(volume) AS volume, sum(trade_count) AS trade_count FROM ohlcv_data WHERE timeframe = '1m' GROUP BY time_bucket('5 minutes', timestamp), symbol; -- Similar for 15m, 1h, 1d ``` #### Compression Policies ```sql -- Compress data older than 7 days SELECT add_compression_policy('ohlcv_data', INTERVAL '7 days'); SELECT add_compression_policy('order_book_snapshots', INTERVAL '1 day'); SELECT add_compression_policy('order_book_1s_agg', INTERVAL '2 days'); SELECT add_compression_policy('order_book_imbalances', INTERVAL '2 days'); SELECT add_compression_policy('trade_events', INTERVAL '7 days'); ``` #### Retention Policies ```sql -- Retain data for specified periods SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days'); SELECT add_retention_policy('order_book_1s_agg', INTERVAL '60 days'); SELECT add_retention_policy('order_book_imbalances', INTERVAL '60 days'); SELECT add_retention_policy('trade_events', INTERVAL '90 days'); SELECT add_retention_policy('ohlcv_data', INTERVAL '2 years'); ``` ### 3. Cache Layer In-memory caching for low-latency real-time data access. ```python class DataCacheManager: """ Manages in-memory cache for real-time data. Provides <10ms latency for latest data access. """ def __init__(self, cache_duration_seconds: int = 300): # Cache last 5 minutes of data self.cache_duration = cache_duration_seconds # In-memory storage self.ohlcv_cache: Dict[str, Dict[str, deque]] = {} self.orderbook_cache: Dict[str, deque] = {} self.imbalance_cache: Dict[str, deque] = {} self.trade_cache: Dict[str, deque] = {} # Cache statistics self.cache_hits = 0 self.cache_misses = 0 def add_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict): """Add OHLCV candle to cache""" def add_orderbook_snapshot(self, symbol: str, snapshot: Dict): """Add order book snapshot to cache""" def add_imbalance_data(self, symbol: str, imbalance: Dict): """Add imbalance metrics to cache""" def get_latest_ohlcv(self, symbol: str, timeframe: str, limit: int = 100) -> List[Dict]: """Get latest OHLCV candles from cache""" def get_latest_orderbook(self, symbol: str) -> Optional[Dict]: """Get latest order book snapshot from cache""" def get_latest_imbalances(self, symbol: str, limit: int = 60) -> List[Dict]: """Get latest imbalance metrics from cache""" def evict_old_data(self): """Remove data older than cache duration""" ``` ### 4. Data Models Standardized data structures for all components. ```python @dataclass class InferenceDataFrame: """Complete inference data for a single timestamp""" symbol: str timestamp: datetime # Multi-timeframe OHLCV ohlcv_1s: pd.DataFrame ohlcv_1m: pd.DataFrame ohlcv_5m: pd.DataFrame ohlcv_15m: pd.DataFrame ohlcv_1h: pd.DataFrame ohlcv_1d: pd.DataFrame # Order book data orderbook_snapshot: Optional[Dict] orderbook_1s_agg: pd.DataFrame # Imbalance metrics imbalances: pd.DataFrame # Multi-timeframe imbalances # Technical indicators (pre-calculated) indicators: Dict[str, float] # Context window data (±N minutes) context_data: Optional[pd.DataFrame] # Metadata data_source: str # 'cache' or 'database' query_latency_ms: float @dataclass class OrderBookDataFrame: """Order book data with imbalances""" symbol: str timestamp: datetime # Raw order book bids: List[Tuple[float, float]] # (price, size) asks: List[Tuple[float, float]] # Aggregated data price_buckets: pd.DataFrame # $1 buckets # Imbalance metrics imbalance_1s: float imbalance_5s: float imbalance_15s: float imbalance_60s: float # Volume-weighted imbalances volume_imbalance_1s: float volume_imbalance_5s: float volume_imbalance_15s: float volume_imbalance_60s: float # Statistics mid_price: float spread: float bid_volume: float ask_volume: float ``` ### 5. Data Ingestion Pipeline Real-time data ingestion with async persistence. ```python class DataIngestionPipeline: """ Handles real-time data ingestion from WebSocket sources. Writes to cache immediately, persists to DB asynchronously. """ def __init__(self, cache_manager, db_connection_pool): self.cache = cache_manager self.db = db_connection_pool # Batch write buffers self.ohlcv_buffer: List[Dict] = [] self.orderbook_buffer: List[Dict] = [] self.trade_buffer: List[Dict] = [] # Batch write settings self.batch_size = 100 self.batch_timeout_seconds = 5 async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict): """ Ingest OHLCV candle. 1. Add to cache immediately 2. Buffer for batch write to DB """ # Immediate cache write self.cache.add_ohlcv_candle(symbol, timeframe, candle) # Buffer for DB write self.ohlcv_buffer.append({ 'symbol': symbol, 'timeframe': timeframe, **candle }) # Flush if buffer full if len(self.ohlcv_buffer) >= self.batch_size: await self._flush_ohlcv_buffer() async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict): """Ingest order book snapshot""" # Immediate cache write self.cache.add_orderbook_snapshot(symbol, snapshot) # Calculate and cache imbalances imbalances = self._calculate_imbalances(symbol, snapshot) self.cache.add_imbalance_data(symbol, imbalances) # Buffer for DB write self.orderbook_buffer.append({ 'symbol': symbol, **snapshot }) # Flush if buffer full if len(self.orderbook_buffer) >= self.batch_size: await self._flush_orderbook_buffer() async def _flush_ohlcv_buffer(self): """Batch write OHLCV data to database""" if not self.ohlcv_buffer: return try: # Prepare batch insert values = [ ( item['timestamp'], item['symbol'], item['timeframe'], item['open'], item['high'], item['low'], item['close'], item['volume'], item.get('trade_count', 0) ) for item in self.ohlcv_buffer ] # Batch insert await self.db.executemany( """ INSERT INTO ohlcv_data (timestamp, symbol, timeframe, open_price, high_price, low_price, close_price, volume, trade_count) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE SET close_price = EXCLUDED.close_price, high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price), low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price), volume = ohlcv_data.volume + EXCLUDED.volume, trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count """, values ) # Clear buffer self.ohlcv_buffer.clear() except Exception as e: logger.error(f"Error flushing OHLCV buffer: {e}") ``` ### 6. Migration System Migrate existing Parquet/pickle data to TimescaleDB. ```python class DataMigrationManager: """ Migrates existing data from Parquet/pickle files to TimescaleDB. Ensures data integrity and provides rollback capability. """ def __init__(self, db_connection_pool, cache_dir: Path): self.db = db_connection_pool self.cache_dir = cache_dir async def migrate_all_data(self): """Migrate all existing data to TimescaleDB""" logger.info("Starting data migration to TimescaleDB") # Migrate OHLCV data from Parquet files await self._migrate_ohlcv_data() # Migrate order book data if exists await self._migrate_orderbook_data() # Verify migration await self._verify_migration() logger.info("Data migration completed successfully") async def _migrate_ohlcv_data(self): """Migrate OHLCV data from Parquet files""" parquet_files = list(self.cache_dir.glob("*.parquet")) for parquet_file in parquet_files: try: # Parse filename: ETHUSDT_1m.parquet filename = parquet_file.stem parts = filename.split('_') if len(parts) != 2: continue symbol_raw = parts[0] timeframe = parts[1] # Convert symbol format symbol = self._convert_symbol_format(symbol_raw) # Read Parquet file df = pd.read_parquet(parquet_file) # Migrate data in batches await self._migrate_ohlcv_batch(symbol, timeframe, df) logger.info(f"Migrated {len(df)} rows from {parquet_file.name}") except Exception as e: logger.error(f"Error migrating {parquet_file}: {e}") async def _migrate_ohlcv_batch(self, symbol: str, timeframe: str, df: pd.DataFrame): """Migrate a batch of OHLCV data""" # Prepare data for insertion values = [] for idx, row in df.iterrows(): values.append(( row['timestamp'], symbol, timeframe, row['open'], row['high'], row['low'], row['close'], row['volume'], row.get('trade_count', 0) )) # Batch insert await self.db.executemany( """ INSERT INTO ohlcv_data (timestamp, symbol, timeframe, open_price, high_price, low_price, close_price, volume, trade_count) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (timestamp, symbol, timeframe) DO NOTHING """, values ) ``` ## Error Handling ### Data Validation ```python class DataValidator: """Validates all incoming data before storage""" @staticmethod def validate_ohlcv(candle: Dict) -> bool: """Validate OHLCV candle data""" try: # Check required fields required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] if not all(field in candle for field in required): return False # Validate OHLC relationships if candle['high'] < candle['low']: logger.warning(f"Invalid OHLCV: high < low") return False if candle['high'] < candle['open'] or candle['high'] < candle['close']: logger.warning(f"Invalid OHLCV: high < open/close") return False if candle['low'] > candle['open'] or candle['low'] > candle['close']: logger.warning(f"Invalid OHLCV: low > open/close") return False # Validate positive volume if candle['volume'] < 0: logger.warning(f"Invalid OHLCV: negative volume") return False return True except Exception as e: logger.error(f"Error validating OHLCV: {e}") return False @staticmethod def validate_orderbook(orderbook: Dict) -> bool: """Validate order book data""" try: # Check required fields if 'bids' not in orderbook or 'asks' not in orderbook: return False # Validate bid/ask relationship if orderbook['bids'] and orderbook['asks']: best_bid = max(bid[0] for bid in orderbook['bids']) best_ask = min(ask[0] for ask in orderbook['asks']) if best_bid >= best_ask: logger.warning(f"Invalid orderbook: bid >= ask") return False return True except Exception as e: logger.error(f"Error validating orderbook: {e}") return False ``` ### Retry Logic ```python class RetryableDBOperation: """Wrapper for database operations with retry logic""" @staticmethod async def execute_with_retry( operation: Callable, max_retries: int = 3, backoff_seconds: float = 1.0 ): """Execute database operation with exponential backoff retry""" for attempt in range(max_retries): try: return await operation() except Exception as e: if attempt == max_retries - 1: logger.error(f"Operation failed after {max_retries} attempts: {e}") raise wait_time = backoff_seconds * (2 ** attempt) logger.warning(f"Operation failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}") await asyncio.sleep(wait_time) ``` ## Testing Strategy ### Unit Tests 1. **Data Validation Tests** - Test OHLCV validation logic - Test order book validation logic - Test timestamp validation and timezone handling 2. **Cache Manager Tests** - Test cache insertion and retrieval - Test cache eviction logic - Test cache hit/miss statistics 3. **Data Model Tests** - Test InferenceDataFrame creation - Test OrderBookDataFrame creation - Test data serialization/deserialization ### Integration Tests 1. **Database Integration Tests** - Test TimescaleDB connection and queries - Test batch insert operations - Test continuous aggregates - Test compression and retention policies 2. **End-to-End Data Flow Tests** - Test real-time data ingestion → cache → database - Test historical data retrieval from database - Test multi-timeframe data alignment 3. **Migration Tests** - Test Parquet file migration - Test data integrity after migration - Test rollback capability ### Performance Tests 1. **Latency Tests** - Cache read latency (<10ms target) - Database query latency (<100ms target) - Batch write throughput (>1000 ops/sec target) 2. **Load Tests** - Concurrent read/write operations - High-frequency data ingestion - Large time-range queries 3. **Storage Tests** - Compression ratio validation (>80% target) - Storage growth over time - Query performance with compressed data ## Performance Optimization ### Query Optimization ```sql -- Use time_bucket for efficient time-range queries SELECT time_bucket('1 minute', timestamp) AS bucket, symbol, first(close_price, timestamp) AS price FROM ohlcv_data WHERE symbol = 'ETH/USDT' AND timeframe = '1s' AND timestamp >= NOW() - INTERVAL '1 hour' GROUP BY bucket, symbol ORDER BY bucket DESC; -- Use indexes for symbol-based queries CREATE INDEX CONCURRENTLY idx_ohlcv_symbol_tf_ts ON ohlcv_data (symbol, timeframe, timestamp DESC); ``` ### Caching Strategy 1. **Hot Data**: Last 5 minutes in memory (all symbols, all timeframes) 2. **Warm Data**: Last 1 hour in TimescaleDB uncompressed 3. **Cold Data**: Older than 1 hour in TimescaleDB compressed ### Batch Operations - Batch size: 100 records or 5 seconds (whichever comes first) - Use `executemany()` for bulk inserts - Use `COPY` command for large migrations ## Deployment Considerations ### Database Setup 1. Install TimescaleDB extension 2. Run schema creation scripts 3. Create hypertables and indexes 4. Set up continuous aggregates 5. Configure compression and retention policies ### Migration Process 1. **Phase 1**: Deploy new code with dual-write (Parquet + TimescaleDB) 2. **Phase 2**: Run migration script to backfill historical data 3. **Phase 3**: Verify data integrity 4. **Phase 4**: Switch reads to TimescaleDB 5. **Phase 5**: Deprecate Parquet writes 6. **Phase 6**: Archive old Parquet files ### Monitoring 1. **Database Metrics** - Query latency (p50, p95, p99) - Write throughput - Storage size and compression ratio - Connection pool utilization 2. **Cache Metrics** - Hit/miss ratio - Cache size - Eviction rate 3. **Application Metrics** - Data retrieval latency - Error rates - Data validation failures ## Security Considerations 1. **Database Access** - Use connection pooling with proper credentials - Implement read-only users for query-only operations - Use SSL/TLS for database connections 2. **Data Validation** - Validate all incoming data before storage - Sanitize inputs to prevent SQL injection - Implement rate limiting for API endpoints 3. **Backup and Recovery** - Regular database backups (daily) - Point-in-time recovery capability - Disaster recovery plan ## Future Enhancements 1. **Multi-Exchange Support** - Store data from multiple exchanges - Cross-exchange arbitrage analysis - Exchange-specific data normalization 2. **Advanced Analytics** - Real-time pattern detection - Anomaly detection - Predictive analytics 3. **Distributed Storage** - Horizontal scaling with TimescaleDB clustering - Read replicas for query load distribution - Geographic distribution for low-latency access