861 lines
27 KiB
Markdown
861 lines
27 KiB
Markdown
# Design Document: Unified Data Storage System
|
|
|
|
## Overview
|
|
|
|
This design document outlines the architecture for unifying all data storage and retrieval methods in the trading system. The current system uses multiple fragmented approaches (Parquet files, pickle files, in-memory caches, and TimescaleDB) which creates complexity and inconsistency. The unified system will consolidate these into a single, efficient TimescaleDB-based storage backend with a clean, unified API.
|
|
|
|
### Key Design Principles
|
|
|
|
1. **Single Source of Truth**: TimescaleDB as the primary storage backend for all time-series data
|
|
2. **Unified Interface**: One method (`get_inference_data()`) for all data retrieval needs
|
|
3. **Performance First**: In-memory caching for real-time data, optimized queries for historical data
|
|
4. **Backward Compatibility**: Seamless migration from existing storage formats
|
|
5. **Separation of Concerns**: Clear boundaries between storage, caching, and business logic
|
|
|
|
## Architecture
|
|
|
|
### High-Level Architecture
|
|
|
|
```
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Application Layer │
|
|
│ (Models, Backtesting, Annotation, Dashboard) │
|
|
└────────────────────┬────────────────────────────────────────┘
|
|
│
|
|
▼
|
|
┌─────────────────────────────────────────────────────────────┐
|
|
│ Unified Data Provider API │
|
|
│ │
|
|
│ get_inference_data(symbol, timestamp=None, context_window) │
|
|
│ get_multi_timeframe_data(symbol, timeframes, timestamp) │
|
|
│ get_order_book_data(symbol, timestamp, aggregation) │
|
|
└────────────────────┬────────────────────────────────────────┘
|
|
│
|
|
┌────────────┴────────────┐
|
|
▼ ▼
|
|
┌──────────────────┐ ┌──────────────────┐
|
|
│ Cache Layer │ │ Storage Layer │
|
|
│ (In-Memory) │ │ (TimescaleDB) │
|
|
│ │ │ │
|
|
│ - Last 5 min │ │ - OHLCV Data │
|
|
│ - Real-time │ │ - Order Book │
|
|
│ - Low latency │ │ - Trade Data │
|
|
└──────────────────┘ │ - Aggregations │
|
|
└──────────────────┘
|
|
```
|
|
|
|
### Data Flow
|
|
|
|
```
|
|
Real-Time Data Flow:
|
|
WebSocket → Tick Aggregator → Cache Layer → TimescaleDB (async)
|
|
↓
|
|
Application (fast read)
|
|
|
|
Historical Data Flow:
|
|
Application → Unified API → TimescaleDB → Cache (optional) → Application
|
|
```
|
|
|
|
## Components and Interfaces
|
|
|
|
### 1. Unified Data Provider
|
|
|
|
The central component that provides a single interface for all data access.
|
|
|
|
```python
|
|
class UnifiedDataProvider:
|
|
"""
|
|
Unified interface for all market data access.
|
|
Handles both real-time and historical data retrieval.
|
|
"""
|
|
|
|
def __init__(self, db_connection_pool, cache_manager):
|
|
self.db = db_connection_pool
|
|
self.cache = cache_manager
|
|
self.symbols = ['ETH/USDT', 'BTC/USDT']
|
|
self.timeframes = ['1s', '1m', '5m', '15m', '1h', '1d']
|
|
|
|
async def get_inference_data(
|
|
self,
|
|
symbol: str,
|
|
timestamp: Optional[datetime] = None,
|
|
context_window_minutes: int = 5
|
|
) -> InferenceDataFrame:
|
|
"""
|
|
Get complete inference data for a symbol at a specific time.
|
|
|
|
Args:
|
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
|
timestamp: Target timestamp (None = latest real-time data)
|
|
context_window_minutes: Minutes of context data before/after timestamp
|
|
|
|
Returns:
|
|
InferenceDataFrame with OHLCV, indicators, COB data, imbalances
|
|
"""
|
|
|
|
async def get_multi_timeframe_data(
|
|
self,
|
|
symbol: str,
|
|
timeframes: List[str],
|
|
timestamp: Optional[datetime] = None,
|
|
limit: int = 100
|
|
) -> Dict[str, pd.DataFrame]:
|
|
"""
|
|
Get aligned multi-timeframe candlestick data.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timeframes: List of timeframes to retrieve
|
|
timestamp: Target timestamp (None = latest)
|
|
limit: Number of candles per timeframe
|
|
|
|
Returns:
|
|
Dictionary mapping timeframe to DataFrame
|
|
"""
|
|
|
|
async def get_order_book_data(
|
|
self,
|
|
symbol: str,
|
|
timestamp: Optional[datetime] = None,
|
|
aggregation: str = '1s',
|
|
limit: int = 300
|
|
) -> OrderBookDataFrame:
|
|
"""
|
|
Get order book data with imbalance metrics.
|
|
|
|
Args:
|
|
symbol: Trading symbol
|
|
timestamp: Target timestamp (None = latest)
|
|
aggregation: Aggregation level ('raw', '1s', '1m')
|
|
limit: Number of data points
|
|
|
|
Returns:
|
|
OrderBookDataFrame with bids, asks, imbalances
|
|
"""
|
|
```
|
|
|
|
### 2. Storage Layer (TimescaleDB)
|
|
|
|
TimescaleDB schema and access patterns.
|
|
|
|
#### Database Schema
|
|
|
|
```sql
|
|
-- OHLCV Data (Hypertable)
|
|
CREATE TABLE ohlcv_data (
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
symbol VARCHAR(20) NOT NULL,
|
|
timeframe VARCHAR(10) NOT NULL,
|
|
open_price DECIMAL(20,8) NOT NULL,
|
|
high_price DECIMAL(20,8) NOT NULL,
|
|
low_price DECIMAL(20,8) NOT NULL,
|
|
close_price DECIMAL(20,8) NOT NULL,
|
|
volume DECIMAL(30,8) NOT NULL,
|
|
trade_count INTEGER,
|
|
-- Technical Indicators (pre-calculated)
|
|
rsi_14 DECIMAL(10,4),
|
|
macd DECIMAL(20,8),
|
|
macd_signal DECIMAL(20,8),
|
|
bb_upper DECIMAL(20,8),
|
|
bb_middle DECIMAL(20,8),
|
|
bb_lower DECIMAL(20,8),
|
|
PRIMARY KEY (timestamp, symbol, timeframe)
|
|
);
|
|
|
|
SELECT create_hypertable('ohlcv_data', 'timestamp');
|
|
CREATE INDEX idx_ohlcv_symbol_tf ON ohlcv_data (symbol, timeframe, timestamp DESC);
|
|
|
|
-- Order Book Snapshots (Hypertable)
|
|
CREATE TABLE order_book_snapshots (
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
symbol VARCHAR(20) NOT NULL,
|
|
exchange VARCHAR(20) NOT NULL,
|
|
bids JSONB NOT NULL, -- Top 50 levels
|
|
asks JSONB NOT NULL, -- Top 50 levels
|
|
mid_price DECIMAL(20,8),
|
|
spread DECIMAL(20,8),
|
|
bid_volume DECIMAL(30,8),
|
|
ask_volume DECIMAL(30,8),
|
|
PRIMARY KEY (timestamp, symbol, exchange)
|
|
);
|
|
|
|
SELECT create_hypertable('order_book_snapshots', 'timestamp');
|
|
CREATE INDEX idx_obs_symbol ON order_book_snapshots (symbol, timestamp DESC);
|
|
|
|
-- Order Book Aggregated 1s (Hypertable)
|
|
CREATE TABLE order_book_1s_agg (
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
symbol VARCHAR(20) NOT NULL,
|
|
price_bucket DECIMAL(20,2) NOT NULL, -- $1 buckets
|
|
bid_volume DECIMAL(30,8),
|
|
ask_volume DECIMAL(30,8),
|
|
bid_count INTEGER,
|
|
ask_count INTEGER,
|
|
imbalance DECIMAL(10,6),
|
|
PRIMARY KEY (timestamp, symbol, price_bucket)
|
|
);
|
|
|
|
SELECT create_hypertable('order_book_1s_agg', 'timestamp');
|
|
CREATE INDEX idx_ob1s_symbol ON order_book_1s_agg (symbol, timestamp DESC);
|
|
|
|
-- Order Book Imbalances (Hypertable)
|
|
CREATE TABLE order_book_imbalances (
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
symbol VARCHAR(20) NOT NULL,
|
|
imbalance_1s DECIMAL(10,6),
|
|
imbalance_5s DECIMAL(10,6),
|
|
imbalance_15s DECIMAL(10,6),
|
|
imbalance_60s DECIMAL(10,6),
|
|
volume_imbalance_1s DECIMAL(10,6),
|
|
volume_imbalance_5s DECIMAL(10,6),
|
|
volume_imbalance_15s DECIMAL(10,6),
|
|
volume_imbalance_60s DECIMAL(10,6),
|
|
price_range DECIMAL(10,2),
|
|
PRIMARY KEY (timestamp, symbol)
|
|
);
|
|
|
|
SELECT create_hypertable('order_book_imbalances', 'timestamp');
|
|
CREATE INDEX idx_obi_symbol ON order_book_imbalances (symbol, timestamp DESC);
|
|
|
|
-- Trade Events (Hypertable)
|
|
CREATE TABLE trade_events (
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
symbol VARCHAR(20) NOT NULL,
|
|
exchange VARCHAR(20) NOT NULL,
|
|
price DECIMAL(20,8) NOT NULL,
|
|
size DECIMAL(30,8) NOT NULL,
|
|
side VARCHAR(4) NOT NULL,
|
|
trade_id VARCHAR(100) NOT NULL,
|
|
PRIMARY KEY (timestamp, symbol, exchange, trade_id)
|
|
);
|
|
|
|
SELECT create_hypertable('trade_events', 'timestamp');
|
|
CREATE INDEX idx_trades_symbol ON trade_events (symbol, timestamp DESC);
|
|
```
|
|
|
|
#### Continuous Aggregates
|
|
|
|
```sql
|
|
-- 1m OHLCV from 1s data
|
|
CREATE MATERIALIZED VIEW ohlcv_1m_continuous
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket('1 minute', timestamp) AS timestamp,
|
|
symbol,
|
|
'1m' AS timeframe,
|
|
first(open_price, timestamp) AS open_price,
|
|
max(high_price) AS high_price,
|
|
min(low_price) AS low_price,
|
|
last(close_price, timestamp) AS close_price,
|
|
sum(volume) AS volume,
|
|
sum(trade_count) AS trade_count
|
|
FROM ohlcv_data
|
|
WHERE timeframe = '1s'
|
|
GROUP BY time_bucket('1 minute', timestamp), symbol;
|
|
|
|
-- 5m OHLCV from 1m data
|
|
CREATE MATERIALIZED VIEW ohlcv_5m_continuous
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket('5 minutes', timestamp) AS timestamp,
|
|
symbol,
|
|
'5m' AS timeframe,
|
|
first(open_price, timestamp) AS open_price,
|
|
max(high_price) AS high_price,
|
|
min(low_price) AS low_price,
|
|
last(close_price, timestamp) AS close_price,
|
|
sum(volume) AS volume,
|
|
sum(trade_count) AS trade_count
|
|
FROM ohlcv_data
|
|
WHERE timeframe = '1m'
|
|
GROUP BY time_bucket('5 minutes', timestamp), symbol;
|
|
|
|
-- Similar for 15m, 1h, 1d
|
|
```
|
|
|
|
#### Compression Policies
|
|
|
|
```sql
|
|
-- Compress data older than 7 days
|
|
SELECT add_compression_policy('ohlcv_data', INTERVAL '7 days');
|
|
SELECT add_compression_policy('order_book_snapshots', INTERVAL '1 day');
|
|
SELECT add_compression_policy('order_book_1s_agg', INTERVAL '2 days');
|
|
SELECT add_compression_policy('order_book_imbalances', INTERVAL '2 days');
|
|
SELECT add_compression_policy('trade_events', INTERVAL '7 days');
|
|
```
|
|
|
|
#### Retention Policies
|
|
|
|
```sql
|
|
-- Retain data for specified periods
|
|
SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days');
|
|
SELECT add_retention_policy('order_book_1s_agg', INTERVAL '60 days');
|
|
SELECT add_retention_policy('order_book_imbalances', INTERVAL '60 days');
|
|
SELECT add_retention_policy('trade_events', INTERVAL '90 days');
|
|
SELECT add_retention_policy('ohlcv_data', INTERVAL '2 years');
|
|
```
|
|
|
|
### 3. Cache Layer
|
|
|
|
In-memory caching for low-latency real-time data access.
|
|
|
|
```python
|
|
class DataCacheManager:
|
|
"""
|
|
Manages in-memory cache for real-time data.
|
|
Provides <10ms latency for latest data access.
|
|
"""
|
|
|
|
def __init__(self, cache_duration_seconds: int = 300):
|
|
# Cache last 5 minutes of data
|
|
self.cache_duration = cache_duration_seconds
|
|
|
|
# In-memory storage
|
|
self.ohlcv_cache: Dict[str, Dict[str, deque]] = {}
|
|
self.orderbook_cache: Dict[str, deque] = {}
|
|
self.imbalance_cache: Dict[str, deque] = {}
|
|
self.trade_cache: Dict[str, deque] = {}
|
|
|
|
# Cache statistics
|
|
self.cache_hits = 0
|
|
self.cache_misses = 0
|
|
|
|
def add_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
|
|
"""Add OHLCV candle to cache"""
|
|
|
|
def add_orderbook_snapshot(self, symbol: str, snapshot: Dict):
|
|
"""Add order book snapshot to cache"""
|
|
|
|
def add_imbalance_data(self, symbol: str, imbalance: Dict):
|
|
"""Add imbalance metrics to cache"""
|
|
|
|
def get_latest_ohlcv(self, symbol: str, timeframe: str, limit: int = 100) -> List[Dict]:
|
|
"""Get latest OHLCV candles from cache"""
|
|
|
|
def get_latest_orderbook(self, symbol: str) -> Optional[Dict]:
|
|
"""Get latest order book snapshot from cache"""
|
|
|
|
def get_latest_imbalances(self, symbol: str, limit: int = 60) -> List[Dict]:
|
|
"""Get latest imbalance metrics from cache"""
|
|
|
|
def evict_old_data(self):
|
|
"""Remove data older than cache duration"""
|
|
```
|
|
|
|
### 4. Data Models
|
|
|
|
Standardized data structures for all components.
|
|
|
|
```python
|
|
@dataclass
|
|
class InferenceDataFrame:
|
|
"""Complete inference data for a single timestamp"""
|
|
symbol: str
|
|
timestamp: datetime
|
|
|
|
# Multi-timeframe OHLCV
|
|
ohlcv_1s: pd.DataFrame
|
|
ohlcv_1m: pd.DataFrame
|
|
ohlcv_5m: pd.DataFrame
|
|
ohlcv_15m: pd.DataFrame
|
|
ohlcv_1h: pd.DataFrame
|
|
ohlcv_1d: pd.DataFrame
|
|
|
|
# Order book data
|
|
orderbook_snapshot: Optional[Dict]
|
|
orderbook_1s_agg: pd.DataFrame
|
|
|
|
# Imbalance metrics
|
|
imbalances: pd.DataFrame # Multi-timeframe imbalances
|
|
|
|
# Technical indicators (pre-calculated)
|
|
indicators: Dict[str, float]
|
|
|
|
# Context window data (±N minutes)
|
|
context_data: Optional[pd.DataFrame]
|
|
|
|
# Metadata
|
|
data_source: str # 'cache' or 'database'
|
|
query_latency_ms: float
|
|
|
|
@dataclass
|
|
class OrderBookDataFrame:
|
|
"""Order book data with imbalances"""
|
|
symbol: str
|
|
timestamp: datetime
|
|
|
|
# Raw order book
|
|
bids: List[Tuple[float, float]] # (price, size)
|
|
asks: List[Tuple[float, float]]
|
|
|
|
# Aggregated data
|
|
price_buckets: pd.DataFrame # $1 buckets
|
|
|
|
# Imbalance metrics
|
|
imbalance_1s: float
|
|
imbalance_5s: float
|
|
imbalance_15s: float
|
|
imbalance_60s: float
|
|
|
|
# Volume-weighted imbalances
|
|
volume_imbalance_1s: float
|
|
volume_imbalance_5s: float
|
|
volume_imbalance_15s: float
|
|
volume_imbalance_60s: float
|
|
|
|
# Statistics
|
|
mid_price: float
|
|
spread: float
|
|
bid_volume: float
|
|
ask_volume: float
|
|
```
|
|
|
|
### 5. Data Ingestion Pipeline
|
|
|
|
Real-time data ingestion with async persistence.
|
|
|
|
```python
|
|
class DataIngestionPipeline:
|
|
"""
|
|
Handles real-time data ingestion from WebSocket sources.
|
|
Writes to cache immediately, persists to DB asynchronously.
|
|
"""
|
|
|
|
def __init__(self, cache_manager, db_connection_pool):
|
|
self.cache = cache_manager
|
|
self.db = db_connection_pool
|
|
|
|
# Batch write buffers
|
|
self.ohlcv_buffer: List[Dict] = []
|
|
self.orderbook_buffer: List[Dict] = []
|
|
self.trade_buffer: List[Dict] = []
|
|
|
|
# Batch write settings
|
|
self.batch_size = 100
|
|
self.batch_timeout_seconds = 5
|
|
|
|
async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict):
|
|
"""
|
|
Ingest OHLCV candle.
|
|
1. Add to cache immediately
|
|
2. Buffer for batch write to DB
|
|
"""
|
|
# Immediate cache write
|
|
self.cache.add_ohlcv_candle(symbol, timeframe, candle)
|
|
|
|
# Buffer for DB write
|
|
self.ohlcv_buffer.append({
|
|
'symbol': symbol,
|
|
'timeframe': timeframe,
|
|
**candle
|
|
})
|
|
|
|
# Flush if buffer full
|
|
if len(self.ohlcv_buffer) >= self.batch_size:
|
|
await self._flush_ohlcv_buffer()
|
|
|
|
async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict):
|
|
"""Ingest order book snapshot"""
|
|
# Immediate cache write
|
|
self.cache.add_orderbook_snapshot(symbol, snapshot)
|
|
|
|
# Calculate and cache imbalances
|
|
imbalances = self._calculate_imbalances(symbol, snapshot)
|
|
self.cache.add_imbalance_data(symbol, imbalances)
|
|
|
|
# Buffer for DB write
|
|
self.orderbook_buffer.append({
|
|
'symbol': symbol,
|
|
**snapshot
|
|
})
|
|
|
|
# Flush if buffer full
|
|
if len(self.orderbook_buffer) >= self.batch_size:
|
|
await self._flush_orderbook_buffer()
|
|
|
|
async def _flush_ohlcv_buffer(self):
|
|
"""Batch write OHLCV data to database"""
|
|
if not self.ohlcv_buffer:
|
|
return
|
|
|
|
try:
|
|
# Prepare batch insert
|
|
values = [
|
|
(
|
|
item['timestamp'],
|
|
item['symbol'],
|
|
item['timeframe'],
|
|
item['open'],
|
|
item['high'],
|
|
item['low'],
|
|
item['close'],
|
|
item['volume'],
|
|
item.get('trade_count', 0)
|
|
)
|
|
for item in self.ohlcv_buffer
|
|
]
|
|
|
|
# Batch insert
|
|
await self.db.executemany(
|
|
"""
|
|
INSERT INTO ohlcv_data
|
|
(timestamp, symbol, timeframe, open_price, high_price,
|
|
low_price, close_price, volume, trade_count)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE
|
|
SET close_price = EXCLUDED.close_price,
|
|
high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price),
|
|
low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price),
|
|
volume = ohlcv_data.volume + EXCLUDED.volume,
|
|
trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count
|
|
""",
|
|
values
|
|
)
|
|
|
|
# Clear buffer
|
|
self.ohlcv_buffer.clear()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error flushing OHLCV buffer: {e}")
|
|
```
|
|
|
|
### 6. Migration System
|
|
|
|
Migrate existing Parquet/pickle data to TimescaleDB.
|
|
|
|
```python
|
|
class DataMigrationManager:
|
|
"""
|
|
Migrates existing data from Parquet/pickle files to TimescaleDB.
|
|
Ensures data integrity and provides rollback capability.
|
|
"""
|
|
|
|
def __init__(self, db_connection_pool, cache_dir: Path):
|
|
self.db = db_connection_pool
|
|
self.cache_dir = cache_dir
|
|
|
|
async def migrate_all_data(self):
|
|
"""Migrate all existing data to TimescaleDB"""
|
|
logger.info("Starting data migration to TimescaleDB")
|
|
|
|
# Migrate OHLCV data from Parquet files
|
|
await self._migrate_ohlcv_data()
|
|
|
|
# Migrate order book data if exists
|
|
await self._migrate_orderbook_data()
|
|
|
|
# Verify migration
|
|
await self._verify_migration()
|
|
|
|
logger.info("Data migration completed successfully")
|
|
|
|
async def _migrate_ohlcv_data(self):
|
|
"""Migrate OHLCV data from Parquet files"""
|
|
parquet_files = list(self.cache_dir.glob("*.parquet"))
|
|
|
|
for parquet_file in parquet_files:
|
|
try:
|
|
# Parse filename: ETHUSDT_1m.parquet
|
|
filename = parquet_file.stem
|
|
parts = filename.split('_')
|
|
|
|
if len(parts) != 2:
|
|
continue
|
|
|
|
symbol_raw = parts[0]
|
|
timeframe = parts[1]
|
|
|
|
# Convert symbol format
|
|
symbol = self._convert_symbol_format(symbol_raw)
|
|
|
|
# Read Parquet file
|
|
df = pd.read_parquet(parquet_file)
|
|
|
|
# Migrate data in batches
|
|
await self._migrate_ohlcv_batch(symbol, timeframe, df)
|
|
|
|
logger.info(f"Migrated {len(df)} rows from {parquet_file.name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error migrating {parquet_file}: {e}")
|
|
|
|
async def _migrate_ohlcv_batch(self, symbol: str, timeframe: str, df: pd.DataFrame):
|
|
"""Migrate a batch of OHLCV data"""
|
|
# Prepare data for insertion
|
|
values = []
|
|
for idx, row in df.iterrows():
|
|
values.append((
|
|
row['timestamp'],
|
|
symbol,
|
|
timeframe,
|
|
row['open'],
|
|
row['high'],
|
|
row['low'],
|
|
row['close'],
|
|
row['volume'],
|
|
row.get('trade_count', 0)
|
|
))
|
|
|
|
# Batch insert
|
|
await self.db.executemany(
|
|
"""
|
|
INSERT INTO ohlcv_data
|
|
(timestamp, symbol, timeframe, open_price, high_price,
|
|
low_price, close_price, volume, trade_count)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
ON CONFLICT (timestamp, symbol, timeframe) DO NOTHING
|
|
""",
|
|
values
|
|
)
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Data Validation
|
|
|
|
```python
|
|
class DataValidator:
|
|
"""Validates all incoming data before storage"""
|
|
|
|
@staticmethod
|
|
def validate_ohlcv(candle: Dict) -> bool:
|
|
"""Validate OHLCV candle data"""
|
|
try:
|
|
# Check required fields
|
|
required = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
|
|
if not all(field in candle for field in required):
|
|
return False
|
|
|
|
# Validate OHLC relationships
|
|
if candle['high'] < candle['low']:
|
|
logger.warning(f"Invalid OHLCV: high < low")
|
|
return False
|
|
|
|
if candle['high'] < candle['open'] or candle['high'] < candle['close']:
|
|
logger.warning(f"Invalid OHLCV: high < open/close")
|
|
return False
|
|
|
|
if candle['low'] > candle['open'] or candle['low'] > candle['close']:
|
|
logger.warning(f"Invalid OHLCV: low > open/close")
|
|
return False
|
|
|
|
# Validate positive volume
|
|
if candle['volume'] < 0:
|
|
logger.warning(f"Invalid OHLCV: negative volume")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating OHLCV: {e}")
|
|
return False
|
|
|
|
@staticmethod
|
|
def validate_orderbook(orderbook: Dict) -> bool:
|
|
"""Validate order book data"""
|
|
try:
|
|
# Check required fields
|
|
if 'bids' not in orderbook or 'asks' not in orderbook:
|
|
return False
|
|
|
|
# Validate bid/ask relationship
|
|
if orderbook['bids'] and orderbook['asks']:
|
|
best_bid = max(bid[0] for bid in orderbook['bids'])
|
|
best_ask = min(ask[0] for ask in orderbook['asks'])
|
|
|
|
if best_bid >= best_ask:
|
|
logger.warning(f"Invalid orderbook: bid >= ask")
|
|
return False
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error validating orderbook: {e}")
|
|
return False
|
|
```
|
|
|
|
### Retry Logic
|
|
|
|
```python
|
|
class RetryableDBOperation:
|
|
"""Wrapper for database operations with retry logic"""
|
|
|
|
@staticmethod
|
|
async def execute_with_retry(
|
|
operation: Callable,
|
|
max_retries: int = 3,
|
|
backoff_seconds: float = 1.0
|
|
):
|
|
"""Execute database operation with exponential backoff retry"""
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await operation()
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
logger.error(f"Operation failed after {max_retries} attempts: {e}")
|
|
raise
|
|
|
|
wait_time = backoff_seconds * (2 ** attempt)
|
|
logger.warning(f"Operation failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}")
|
|
await asyncio.sleep(wait_time)
|
|
```
|
|
|
|
## Testing Strategy
|
|
|
|
### Unit Tests
|
|
|
|
1. **Data Validation Tests**
|
|
- Test OHLCV validation logic
|
|
- Test order book validation logic
|
|
- Test timestamp validation and timezone handling
|
|
|
|
2. **Cache Manager Tests**
|
|
- Test cache insertion and retrieval
|
|
- Test cache eviction logic
|
|
- Test cache hit/miss statistics
|
|
|
|
3. **Data Model Tests**
|
|
- Test InferenceDataFrame creation
|
|
- Test OrderBookDataFrame creation
|
|
- Test data serialization/deserialization
|
|
|
|
### Integration Tests
|
|
|
|
1. **Database Integration Tests**
|
|
- Test TimescaleDB connection and queries
|
|
- Test batch insert operations
|
|
- Test continuous aggregates
|
|
- Test compression and retention policies
|
|
|
|
2. **End-to-End Data Flow Tests**
|
|
- Test real-time data ingestion → cache → database
|
|
- Test historical data retrieval from database
|
|
- Test multi-timeframe data alignment
|
|
|
|
3. **Migration Tests**
|
|
- Test Parquet file migration
|
|
- Test data integrity after migration
|
|
- Test rollback capability
|
|
|
|
### Performance Tests
|
|
|
|
1. **Latency Tests**
|
|
- Cache read latency (<10ms target)
|
|
- Database query latency (<100ms target)
|
|
- Batch write throughput (>1000 ops/sec target)
|
|
|
|
2. **Load Tests**
|
|
- Concurrent read/write operations
|
|
- High-frequency data ingestion
|
|
- Large time-range queries
|
|
|
|
3. **Storage Tests**
|
|
- Compression ratio validation (>80% target)
|
|
- Storage growth over time
|
|
- Query performance with compressed data
|
|
|
|
## Performance Optimization
|
|
|
|
### Query Optimization
|
|
|
|
```sql
|
|
-- Use time_bucket for efficient time-range queries
|
|
SELECT
|
|
time_bucket('1 minute', timestamp) AS bucket,
|
|
symbol,
|
|
first(close_price, timestamp) AS price
|
|
FROM ohlcv_data
|
|
WHERE symbol = 'ETH/USDT'
|
|
AND timeframe = '1s'
|
|
AND timestamp >= NOW() - INTERVAL '1 hour'
|
|
GROUP BY bucket, symbol
|
|
ORDER BY bucket DESC;
|
|
|
|
-- Use indexes for symbol-based queries
|
|
CREATE INDEX CONCURRENTLY idx_ohlcv_symbol_tf_ts
|
|
ON ohlcv_data (symbol, timeframe, timestamp DESC);
|
|
```
|
|
|
|
### Caching Strategy
|
|
|
|
1. **Hot Data**: Last 5 minutes in memory (all symbols, all timeframes)
|
|
2. **Warm Data**: Last 1 hour in TimescaleDB uncompressed
|
|
3. **Cold Data**: Older than 1 hour in TimescaleDB compressed
|
|
|
|
### Batch Operations
|
|
|
|
- Batch size: 100 records or 5 seconds (whichever comes first)
|
|
- Use `executemany()` for bulk inserts
|
|
- Use `COPY` command for large migrations
|
|
|
|
## Deployment Considerations
|
|
|
|
### Database Setup
|
|
|
|
1. Install TimescaleDB extension
|
|
2. Run schema creation scripts
|
|
3. Create hypertables and indexes
|
|
4. Set up continuous aggregates
|
|
5. Configure compression and retention policies
|
|
|
|
### Migration Process
|
|
|
|
1. **Phase 1**: Deploy new code with dual-write (Parquet + TimescaleDB)
|
|
2. **Phase 2**: Run migration script to backfill historical data
|
|
3. **Phase 3**: Verify data integrity
|
|
4. **Phase 4**: Switch reads to TimescaleDB
|
|
5. **Phase 5**: Deprecate Parquet writes
|
|
6. **Phase 6**: Archive old Parquet files
|
|
|
|
### Monitoring
|
|
|
|
1. **Database Metrics**
|
|
- Query latency (p50, p95, p99)
|
|
- Write throughput
|
|
- Storage size and compression ratio
|
|
- Connection pool utilization
|
|
|
|
2. **Cache Metrics**
|
|
- Hit/miss ratio
|
|
- Cache size
|
|
- Eviction rate
|
|
|
|
3. **Application Metrics**
|
|
- Data retrieval latency
|
|
- Error rates
|
|
- Data validation failures
|
|
|
|
## Security Considerations
|
|
|
|
1. **Database Access**
|
|
- Use connection pooling with proper credentials
|
|
- Implement read-only users for query-only operations
|
|
- Use SSL/TLS for database connections
|
|
|
|
2. **Data Validation**
|
|
- Validate all incoming data before storage
|
|
- Sanitize inputs to prevent SQL injection
|
|
- Implement rate limiting for API endpoints
|
|
|
|
3. **Backup and Recovery**
|
|
- Regular database backups (daily)
|
|
- Point-in-time recovery capability
|
|
- Disaster recovery plan
|
|
|
|
## Future Enhancements
|
|
|
|
1. **Multi-Exchange Support**
|
|
- Store data from multiple exchanges
|
|
- Cross-exchange arbitrage analysis
|
|
- Exchange-specific data normalization
|
|
|
|
2. **Advanced Analytics**
|
|
- Real-time pattern detection
|
|
- Anomaly detection
|
|
- Predictive analytics
|
|
|
|
3. **Distributed Storage**
|
|
- Horizontal scaling with TimescaleDB clustering
|
|
- Read replicas for query load distribution
|
|
- Geographic distribution for low-latency access
|