diff --git a/.kiro/specs/manual-trade-annotation-ui/design.md b/.kiro/specs/5.manual-trade-annotation-ui/design.md similarity index 100% rename from .kiro/specs/manual-trade-annotation-ui/design.md rename to .kiro/specs/5.manual-trade-annotation-ui/design.md diff --git a/.kiro/specs/manual-trade-annotation-ui/requirements.md b/.kiro/specs/5.manual-trade-annotation-ui/requirements.md similarity index 100% rename from .kiro/specs/manual-trade-annotation-ui/requirements.md rename to .kiro/specs/5.manual-trade-annotation-ui/requirements.md diff --git a/.kiro/specs/manual-trade-annotation-ui/tasks.md b/.kiro/specs/5.manual-trade-annotation-ui/tasks.md similarity index 100% rename from .kiro/specs/manual-trade-annotation-ui/tasks.md rename to .kiro/specs/5.manual-trade-annotation-ui/tasks.md diff --git a/.kiro/specs/unified-data-storage/design.md b/.kiro/specs/unified-data-storage/design.md new file mode 100644 index 0000000..ef30d29 --- /dev/null +++ b/.kiro/specs/unified-data-storage/design.md @@ -0,0 +1,860 @@ +# Design Document: Unified Data Storage System + +## Overview + +This design document outlines the architecture for unifying all data storage and retrieval methods in the trading system. The current system uses multiple fragmented approaches (Parquet files, pickle files, in-memory caches, and TimescaleDB) which creates complexity and inconsistency. The unified system will consolidate these into a single, efficient TimescaleDB-based storage backend with a clean, unified API. + +### Key Design Principles + +1. **Single Source of Truth**: TimescaleDB as the primary storage backend for all time-series data +2. **Unified Interface**: One method (`get_inference_data()`) for all data retrieval needs +3. **Performance First**: In-memory caching for real-time data, optimized queries for historical data +4. **Backward Compatibility**: Seamless migration from existing storage formats +5. **Separation of Concerns**: Clear boundaries between storage, caching, and business logic + +## Architecture + +### High-Level Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Application Layer │ +│ (Models, Backtesting, Annotation, Dashboard) │ +└────────────────────┬────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ Unified Data Provider API │ +│ │ +│ get_inference_data(symbol, timestamp=None, context_window) │ +│ get_multi_timeframe_data(symbol, timeframes, timestamp) │ +│ get_order_book_data(symbol, timestamp, aggregation) │ +└────────────────────┬────────────────────────────────────────┘ + │ + ┌────────────┴────────────┐ + ▼ ▼ +┌──────────────────┐ ┌──────────────────┐ +│ Cache Layer │ │ Storage Layer │ +│ (In-Memory) │ │ (TimescaleDB) │ +│ │ │ │ +│ - Last 5 min │ │ - OHLCV Data │ +│ - Real-time │ │ - Order Book │ +│ - Low latency │ │ - Trade Data │ +└──────────────────┘ │ - Aggregations │ + └──────────────────┘ +``` + +### Data Flow + +``` +Real-Time Data Flow: +WebSocket → Tick Aggregator → Cache Layer → TimescaleDB (async) + ↓ + Application (fast read) + +Historical Data Flow: +Application → Unified API → TimescaleDB → Cache (optional) → Application +``` + +## Components and Interfaces + +### 1. Unified Data Provider + +The central component that provides a single interface for all data access. + +```python +class UnifiedDataProvider: + """ + Unified interface for all market data access. + Handles both real-time and historical data retrieval. + """ + + def __init__(self, db_connection_pool, cache_manager): + self.db = db_connection_pool + self.cache = cache_manager + self.symbols = ['ETH/USDT', 'BTC/USDT'] + self.timeframes = ['1s', '1m', '5m', '15m', '1h', '1d'] + + async def get_inference_data( + self, + symbol: str, + timestamp: Optional[datetime] = None, + context_window_minutes: int = 5 + ) -> InferenceDataFrame: + """ + Get complete inference data for a symbol at a specific time. + + Args: + symbol: Trading symbol (e.g., 'ETH/USDT') + timestamp: Target timestamp (None = latest real-time data) + context_window_minutes: Minutes of context data before/after timestamp + + Returns: + InferenceDataFrame with OHLCV, indicators, COB data, imbalances + """ + + async def get_multi_timeframe_data( + self, + symbol: str, + timeframes: List[str], + timestamp: Optional[datetime] = None, + limit: int = 100 + ) -> Dict[str, pd.DataFrame]: + """ + Get aligned multi-timeframe candlestick data. + + Args: + symbol: Trading symbol + timeframes: List of timeframes to retrieve + timestamp: Target timestamp (None = latest) + limit: Number of candles per timeframe + + Returns: + Dictionary mapping timeframe to DataFrame + """ + + async def get_order_book_data( + self, + symbol: str, + timestamp: Optional[datetime] = None, + aggregation: str = '1s', + limit: int = 300 + ) -> OrderBookDataFrame: + """ + Get order book data with imbalance metrics. + + Args: + symbol: Trading symbol + timestamp: Target timestamp (None = latest) + aggregation: Aggregation level ('raw', '1s', '1m') + limit: Number of data points + + Returns: + OrderBookDataFrame with bids, asks, imbalances + """ +``` + +### 2. Storage Layer (TimescaleDB) + +TimescaleDB schema and access patterns. + +#### Database Schema + +```sql +-- OHLCV Data (Hypertable) +CREATE TABLE ohlcv_data ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + timeframe VARCHAR(10) NOT NULL, + open_price DECIMAL(20,8) NOT NULL, + high_price DECIMAL(20,8) NOT NULL, + low_price DECIMAL(20,8) NOT NULL, + close_price DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + trade_count INTEGER, + -- Technical Indicators (pre-calculated) + rsi_14 DECIMAL(10,4), + macd DECIMAL(20,8), + macd_signal DECIMAL(20,8), + bb_upper DECIMAL(20,8), + bb_middle DECIMAL(20,8), + bb_lower DECIMAL(20,8), + PRIMARY KEY (timestamp, symbol, timeframe) +); + +SELECT create_hypertable('ohlcv_data', 'timestamp'); +CREATE INDEX idx_ohlcv_symbol_tf ON ohlcv_data (symbol, timeframe, timestamp DESC); + +-- Order Book Snapshots (Hypertable) +CREATE TABLE order_book_snapshots ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + bids JSONB NOT NULL, -- Top 50 levels + asks JSONB NOT NULL, -- Top 50 levels + mid_price DECIMAL(20,8), + spread DECIMAL(20,8), + bid_volume DECIMAL(30,8), + ask_volume DECIMAL(30,8), + PRIMARY KEY (timestamp, symbol, exchange) +); + +SELECT create_hypertable('order_book_snapshots', 'timestamp'); +CREATE INDEX idx_obs_symbol ON order_book_snapshots (symbol, timestamp DESC); + +-- Order Book Aggregated 1s (Hypertable) +CREATE TABLE order_book_1s_agg ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + price_bucket DECIMAL(20,2) NOT NULL, -- $1 buckets + bid_volume DECIMAL(30,8), + ask_volume DECIMAL(30,8), + bid_count INTEGER, + ask_count INTEGER, + imbalance DECIMAL(10,6), + PRIMARY KEY (timestamp, symbol, price_bucket) +); + +SELECT create_hypertable('order_book_1s_agg', 'timestamp'); +CREATE INDEX idx_ob1s_symbol ON order_book_1s_agg (symbol, timestamp DESC); + +-- Order Book Imbalances (Hypertable) +CREATE TABLE order_book_imbalances ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + imbalance_1s DECIMAL(10,6), + imbalance_5s DECIMAL(10,6), + imbalance_15s DECIMAL(10,6), + imbalance_60s DECIMAL(10,6), + volume_imbalance_1s DECIMAL(10,6), + volume_imbalance_5s DECIMAL(10,6), + volume_imbalance_15s DECIMAL(10,6), + volume_imbalance_60s DECIMAL(10,6), + price_range DECIMAL(10,2), + PRIMARY KEY (timestamp, symbol) +); + +SELECT create_hypertable('order_book_imbalances', 'timestamp'); +CREATE INDEX idx_obi_symbol ON order_book_imbalances (symbol, timestamp DESC); + +-- Trade Events (Hypertable) +CREATE TABLE trade_events ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + price DECIMAL(20,8) NOT NULL, + size DECIMAL(30,8) NOT NULL, + side VARCHAR(4) NOT NULL, + trade_id VARCHAR(100) NOT NULL, + PRIMARY KEY (timestamp, symbol, exchange, trade_id) +); + +SELECT create_hypertable('trade_events', 'timestamp'); +CREATE INDEX idx_trades_symbol ON trade_events (symbol, timestamp DESC); +``` + +#### Continuous Aggregates + +```sql +-- 1m OHLCV from 1s data +CREATE MATERIALIZED VIEW ohlcv_1m_continuous +WITH (timescaledb.continuous) AS +SELECT + time_bucket('1 minute', timestamp) AS timestamp, + symbol, + '1m' AS timeframe, + first(open_price, timestamp) AS open_price, + max(high_price) AS high_price, + min(low_price) AS low_price, + last(close_price, timestamp) AS close_price, + sum(volume) AS volume, + sum(trade_count) AS trade_count +FROM ohlcv_data +WHERE timeframe = '1s' +GROUP BY time_bucket('1 minute', timestamp), symbol; + +-- 5m OHLCV from 1m data +CREATE MATERIALIZED VIEW ohlcv_5m_continuous +WITH (timescaledb.continuous) AS +SELECT + time_bucket('5 minutes', timestamp) AS timestamp, + symbol, + '5m' AS timeframe, + first(open_price, timestamp) AS open_price, + max(high_price) AS high_price, + min(low_price) AS low_price, + last(close_price, timestamp) AS close_price, + sum(volume) AS volume, + sum(trade_count) AS trade_count +FROM ohlcv_data +WHERE timeframe = '1m' +GROUP BY time_bucket('5 minutes', timestamp), symbol; + +-- Similar for 15m, 1h, 1d +``` + +#### Compression Policies + +```sql +-- Compress data older than 7 days +SELECT add_compression_policy('ohlcv_data', INTERVAL '7 days'); +SELECT add_compression_policy('order_book_snapshots', INTERVAL '1 day'); +SELECT add_compression_policy('order_book_1s_agg', INTERVAL '2 days'); +SELECT add_compression_policy('order_book_imbalances', INTERVAL '2 days'); +SELECT add_compression_policy('trade_events', INTERVAL '7 days'); +``` + +#### Retention Policies + +```sql +-- Retain data for specified periods +SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days'); +SELECT add_retention_policy('order_book_1s_agg', INTERVAL '60 days'); +SELECT add_retention_policy('order_book_imbalances', INTERVAL '60 days'); +SELECT add_retention_policy('trade_events', INTERVAL '90 days'); +SELECT add_retention_policy('ohlcv_data', INTERVAL '2 years'); +``` + +### 3. Cache Layer + +In-memory caching for low-latency real-time data access. + +```python +class DataCacheManager: + """ + Manages in-memory cache for real-time data. + Provides <10ms latency for latest data access. + """ + + def __init__(self, cache_duration_seconds: int = 300): + # Cache last 5 minutes of data + self.cache_duration = cache_duration_seconds + + # In-memory storage + self.ohlcv_cache: Dict[str, Dict[str, deque]] = {} + self.orderbook_cache: Dict[str, deque] = {} + self.imbalance_cache: Dict[str, deque] = {} + self.trade_cache: Dict[str, deque] = {} + + # Cache statistics + self.cache_hits = 0 + self.cache_misses = 0 + + def add_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict): + """Add OHLCV candle to cache""" + + def add_orderbook_snapshot(self, symbol: str, snapshot: Dict): + """Add order book snapshot to cache""" + + def add_imbalance_data(self, symbol: str, imbalance: Dict): + """Add imbalance metrics to cache""" + + def get_latest_ohlcv(self, symbol: str, timeframe: str, limit: int = 100) -> List[Dict]: + """Get latest OHLCV candles from cache""" + + def get_latest_orderbook(self, symbol: str) -> Optional[Dict]: + """Get latest order book snapshot from cache""" + + def get_latest_imbalances(self, symbol: str, limit: int = 60) -> List[Dict]: + """Get latest imbalance metrics from cache""" + + def evict_old_data(self): + """Remove data older than cache duration""" +``` + +### 4. Data Models + +Standardized data structures for all components. + +```python +@dataclass +class InferenceDataFrame: + """Complete inference data for a single timestamp""" + symbol: str + timestamp: datetime + + # Multi-timeframe OHLCV + ohlcv_1s: pd.DataFrame + ohlcv_1m: pd.DataFrame + ohlcv_5m: pd.DataFrame + ohlcv_15m: pd.DataFrame + ohlcv_1h: pd.DataFrame + ohlcv_1d: pd.DataFrame + + # Order book data + orderbook_snapshot: Optional[Dict] + orderbook_1s_agg: pd.DataFrame + + # Imbalance metrics + imbalances: pd.DataFrame # Multi-timeframe imbalances + + # Technical indicators (pre-calculated) + indicators: Dict[str, float] + + # Context window data (±N minutes) + context_data: Optional[pd.DataFrame] + + # Metadata + data_source: str # 'cache' or 'database' + query_latency_ms: float + +@dataclass +class OrderBookDataFrame: + """Order book data with imbalances""" + symbol: str + timestamp: datetime + + # Raw order book + bids: List[Tuple[float, float]] # (price, size) + asks: List[Tuple[float, float]] + + # Aggregated data + price_buckets: pd.DataFrame # $1 buckets + + # Imbalance metrics + imbalance_1s: float + imbalance_5s: float + imbalance_15s: float + imbalance_60s: float + + # Volume-weighted imbalances + volume_imbalance_1s: float + volume_imbalance_5s: float + volume_imbalance_15s: float + volume_imbalance_60s: float + + # Statistics + mid_price: float + spread: float + bid_volume: float + ask_volume: float +``` + +### 5. Data Ingestion Pipeline + +Real-time data ingestion with async persistence. + +```python +class DataIngestionPipeline: + """ + Handles real-time data ingestion from WebSocket sources. + Writes to cache immediately, persists to DB asynchronously. + """ + + def __init__(self, cache_manager, db_connection_pool): + self.cache = cache_manager + self.db = db_connection_pool + + # Batch write buffers + self.ohlcv_buffer: List[Dict] = [] + self.orderbook_buffer: List[Dict] = [] + self.trade_buffer: List[Dict] = [] + + # Batch write settings + self.batch_size = 100 + self.batch_timeout_seconds = 5 + + async def ingest_ohlcv_candle(self, symbol: str, timeframe: str, candle: Dict): + """ + Ingest OHLCV candle. + 1. Add to cache immediately + 2. Buffer for batch write to DB + """ + # Immediate cache write + self.cache.add_ohlcv_candle(symbol, timeframe, candle) + + # Buffer for DB write + self.ohlcv_buffer.append({ + 'symbol': symbol, + 'timeframe': timeframe, + **candle + }) + + # Flush if buffer full + if len(self.ohlcv_buffer) >= self.batch_size: + await self._flush_ohlcv_buffer() + + async def ingest_orderbook_snapshot(self, symbol: str, snapshot: Dict): + """Ingest order book snapshot""" + # Immediate cache write + self.cache.add_orderbook_snapshot(symbol, snapshot) + + # Calculate and cache imbalances + imbalances = self._calculate_imbalances(symbol, snapshot) + self.cache.add_imbalance_data(symbol, imbalances) + + # Buffer for DB write + self.orderbook_buffer.append({ + 'symbol': symbol, + **snapshot + }) + + # Flush if buffer full + if len(self.orderbook_buffer) >= self.batch_size: + await self._flush_orderbook_buffer() + + async def _flush_ohlcv_buffer(self): + """Batch write OHLCV data to database""" + if not self.ohlcv_buffer: + return + + try: + # Prepare batch insert + values = [ + ( + item['timestamp'], + item['symbol'], + item['timeframe'], + item['open'], + item['high'], + item['low'], + item['close'], + item['volume'], + item.get('trade_count', 0) + ) + for item in self.ohlcv_buffer + ] + + # Batch insert + await self.db.executemany( + """ + INSERT INTO ohlcv_data + (timestamp, symbol, timeframe, open_price, high_price, + low_price, close_price, volume, trade_count) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (timestamp, symbol, timeframe) DO UPDATE + SET close_price = EXCLUDED.close_price, + high_price = GREATEST(ohlcv_data.high_price, EXCLUDED.high_price), + low_price = LEAST(ohlcv_data.low_price, EXCLUDED.low_price), + volume = ohlcv_data.volume + EXCLUDED.volume, + trade_count = ohlcv_data.trade_count + EXCLUDED.trade_count + """, + values + ) + + # Clear buffer + self.ohlcv_buffer.clear() + + except Exception as e: + logger.error(f"Error flushing OHLCV buffer: {e}") +``` + +### 6. Migration System + +Migrate existing Parquet/pickle data to TimescaleDB. + +```python +class DataMigrationManager: + """ + Migrates existing data from Parquet/pickle files to TimescaleDB. + Ensures data integrity and provides rollback capability. + """ + + def __init__(self, db_connection_pool, cache_dir: Path): + self.db = db_connection_pool + self.cache_dir = cache_dir + + async def migrate_all_data(self): + """Migrate all existing data to TimescaleDB""" + logger.info("Starting data migration to TimescaleDB") + + # Migrate OHLCV data from Parquet files + await self._migrate_ohlcv_data() + + # Migrate order book data if exists + await self._migrate_orderbook_data() + + # Verify migration + await self._verify_migration() + + logger.info("Data migration completed successfully") + + async def _migrate_ohlcv_data(self): + """Migrate OHLCV data from Parquet files""" + parquet_files = list(self.cache_dir.glob("*.parquet")) + + for parquet_file in parquet_files: + try: + # Parse filename: ETHUSDT_1m.parquet + filename = parquet_file.stem + parts = filename.split('_') + + if len(parts) != 2: + continue + + symbol_raw = parts[0] + timeframe = parts[1] + + # Convert symbol format + symbol = self._convert_symbol_format(symbol_raw) + + # Read Parquet file + df = pd.read_parquet(parquet_file) + + # Migrate data in batches + await self._migrate_ohlcv_batch(symbol, timeframe, df) + + logger.info(f"Migrated {len(df)} rows from {parquet_file.name}") + + except Exception as e: + logger.error(f"Error migrating {parquet_file}: {e}") + + async def _migrate_ohlcv_batch(self, symbol: str, timeframe: str, df: pd.DataFrame): + """Migrate a batch of OHLCV data""" + # Prepare data for insertion + values = [] + for idx, row in df.iterrows(): + values.append(( + row['timestamp'], + symbol, + timeframe, + row['open'], + row['high'], + row['low'], + row['close'], + row['volume'], + row.get('trade_count', 0) + )) + + # Batch insert + await self.db.executemany( + """ + INSERT INTO ohlcv_data + (timestamp, symbol, timeframe, open_price, high_price, + low_price, close_price, volume, trade_count) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (timestamp, symbol, timeframe) DO NOTHING + """, + values + ) +``` + +## Error Handling + +### Data Validation + +```python +class DataValidator: + """Validates all incoming data before storage""" + + @staticmethod + def validate_ohlcv(candle: Dict) -> bool: + """Validate OHLCV candle data""" + try: + # Check required fields + required = ['timestamp', 'open', 'high', 'low', 'close', 'volume'] + if not all(field in candle for field in required): + return False + + # Validate OHLC relationships + if candle['high'] < candle['low']: + logger.warning(f"Invalid OHLCV: high < low") + return False + + if candle['high'] < candle['open'] or candle['high'] < candle['close']: + logger.warning(f"Invalid OHLCV: high < open/close") + return False + + if candle['low'] > candle['open'] or candle['low'] > candle['close']: + logger.warning(f"Invalid OHLCV: low > open/close") + return False + + # Validate positive volume + if candle['volume'] < 0: + logger.warning(f"Invalid OHLCV: negative volume") + return False + + return True + + except Exception as e: + logger.error(f"Error validating OHLCV: {e}") + return False + + @staticmethod + def validate_orderbook(orderbook: Dict) -> bool: + """Validate order book data""" + try: + # Check required fields + if 'bids' not in orderbook or 'asks' not in orderbook: + return False + + # Validate bid/ask relationship + if orderbook['bids'] and orderbook['asks']: + best_bid = max(bid[0] for bid in orderbook['bids']) + best_ask = min(ask[0] for ask in orderbook['asks']) + + if best_bid >= best_ask: + logger.warning(f"Invalid orderbook: bid >= ask") + return False + + return True + + except Exception as e: + logger.error(f"Error validating orderbook: {e}") + return False +``` + +### Retry Logic + +```python +class RetryableDBOperation: + """Wrapper for database operations with retry logic""" + + @staticmethod + async def execute_with_retry( + operation: Callable, + max_retries: int = 3, + backoff_seconds: float = 1.0 + ): + """Execute database operation with exponential backoff retry""" + for attempt in range(max_retries): + try: + return await operation() + except Exception as e: + if attempt == max_retries - 1: + logger.error(f"Operation failed after {max_retries} attempts: {e}") + raise + + wait_time = backoff_seconds * (2 ** attempt) + logger.warning(f"Operation failed (attempt {attempt + 1}), retrying in {wait_time}s: {e}") + await asyncio.sleep(wait_time) +``` + +## Testing Strategy + +### Unit Tests + +1. **Data Validation Tests** + - Test OHLCV validation logic + - Test order book validation logic + - Test timestamp validation and timezone handling + +2. **Cache Manager Tests** + - Test cache insertion and retrieval + - Test cache eviction logic + - Test cache hit/miss statistics + +3. **Data Model Tests** + - Test InferenceDataFrame creation + - Test OrderBookDataFrame creation + - Test data serialization/deserialization + +### Integration Tests + +1. **Database Integration Tests** + - Test TimescaleDB connection and queries + - Test batch insert operations + - Test continuous aggregates + - Test compression and retention policies + +2. **End-to-End Data Flow Tests** + - Test real-time data ingestion → cache → database + - Test historical data retrieval from database + - Test multi-timeframe data alignment + +3. **Migration Tests** + - Test Parquet file migration + - Test data integrity after migration + - Test rollback capability + +### Performance Tests + +1. **Latency Tests** + - Cache read latency (<10ms target) + - Database query latency (<100ms target) + - Batch write throughput (>1000 ops/sec target) + +2. **Load Tests** + - Concurrent read/write operations + - High-frequency data ingestion + - Large time-range queries + +3. **Storage Tests** + - Compression ratio validation (>80% target) + - Storage growth over time + - Query performance with compressed data + +## Performance Optimization + +### Query Optimization + +```sql +-- Use time_bucket for efficient time-range queries +SELECT + time_bucket('1 minute', timestamp) AS bucket, + symbol, + first(close_price, timestamp) AS price +FROM ohlcv_data +WHERE symbol = 'ETH/USDT' + AND timeframe = '1s' + AND timestamp >= NOW() - INTERVAL '1 hour' +GROUP BY bucket, symbol +ORDER BY bucket DESC; + +-- Use indexes for symbol-based queries +CREATE INDEX CONCURRENTLY idx_ohlcv_symbol_tf_ts +ON ohlcv_data (symbol, timeframe, timestamp DESC); +``` + +### Caching Strategy + +1. **Hot Data**: Last 5 minutes in memory (all symbols, all timeframes) +2. **Warm Data**: Last 1 hour in TimescaleDB uncompressed +3. **Cold Data**: Older than 1 hour in TimescaleDB compressed + +### Batch Operations + +- Batch size: 100 records or 5 seconds (whichever comes first) +- Use `executemany()` for bulk inserts +- Use `COPY` command for large migrations + +## Deployment Considerations + +### Database Setup + +1. Install TimescaleDB extension +2. Run schema creation scripts +3. Create hypertables and indexes +4. Set up continuous aggregates +5. Configure compression and retention policies + +### Migration Process + +1. **Phase 1**: Deploy new code with dual-write (Parquet + TimescaleDB) +2. **Phase 2**: Run migration script to backfill historical data +3. **Phase 3**: Verify data integrity +4. **Phase 4**: Switch reads to TimescaleDB +5. **Phase 5**: Deprecate Parquet writes +6. **Phase 6**: Archive old Parquet files + +### Monitoring + +1. **Database Metrics** + - Query latency (p50, p95, p99) + - Write throughput + - Storage size and compression ratio + - Connection pool utilization + +2. **Cache Metrics** + - Hit/miss ratio + - Cache size + - Eviction rate + +3. **Application Metrics** + - Data retrieval latency + - Error rates + - Data validation failures + +## Security Considerations + +1. **Database Access** + - Use connection pooling with proper credentials + - Implement read-only users for query-only operations + - Use SSL/TLS for database connections + +2. **Data Validation** + - Validate all incoming data before storage + - Sanitize inputs to prevent SQL injection + - Implement rate limiting for API endpoints + +3. **Backup and Recovery** + - Regular database backups (daily) + - Point-in-time recovery capability + - Disaster recovery plan + +## Future Enhancements + +1. **Multi-Exchange Support** + - Store data from multiple exchanges + - Cross-exchange arbitrage analysis + - Exchange-specific data normalization + +2. **Advanced Analytics** + - Real-time pattern detection + - Anomaly detection + - Predictive analytics + +3. **Distributed Storage** + - Horizontal scaling with TimescaleDB clustering + - Read replicas for query load distribution + - Geographic distribution for low-latency access diff --git a/.kiro/specs/unified-data-storage/requirements.md b/.kiro/specs/unified-data-storage/requirements.md new file mode 100644 index 0000000..5be8589 --- /dev/null +++ b/.kiro/specs/unified-data-storage/requirements.md @@ -0,0 +1,134 @@ +# Requirements Document + +## Introduction + +This feature aims to unify all data storage and retrieval methods across the trading system into a single, coherent interface. Currently, the system uses multiple storage approaches (Parquet files, pickle files, in-memory caches, TimescaleDB) and has fragmented data access patterns. This creates complexity, inconsistency, and performance issues. + +The unified data storage system will provide a single endpoint for retrieving inference data, supporting both real-time streaming data and historical backtesting/annotation scenarios. It will consolidate storage methods into the most efficient approach and ensure all components use consistent data access patterns. + +## Requirements + +### Requirement 1: Unified Data Retrieval Interface + +**User Story:** As a developer, I want a single method to retrieve inference data regardless of whether I need real-time or historical data, so that I can simplify my code and ensure consistency. + +#### Acceptance Criteria + +1. WHEN a component requests inference data THEN the system SHALL provide a unified `get_inference_data()` method that accepts a timestamp parameter +2. WHEN timestamp is None or "latest" THEN the system SHALL return the most recent cached real-time data +3. WHEN timestamp is a specific datetime THEN the system SHALL return historical data from local storage at that timestamp +4. WHEN requesting inference data THEN the system SHALL return data in a standardized format with all required features (OHLCV, technical indicators, COB data, order book imbalances) +5. WHEN the requested timestamp is not available THEN the system SHALL return the nearest available data point with a warning + +### Requirement 2: Consolidated Storage Backend + +**User Story:** As a system architect, I want all market data stored using a single, optimized storage method, so that I can reduce complexity and improve performance. + +#### Acceptance Criteria + +1. WHEN storing candlestick data THEN the system SHALL use TimescaleDB as the primary storage backend +2. WHEN storing raw order book ticks THEN the system SHALL use TimescaleDB with appropriate compression +3. WHEN storing aggregated 1s/1m data THEN the system SHALL use TimescaleDB hypertables for efficient time-series queries +4. WHEN the system starts THEN it SHALL migrate existing Parquet and pickle files to TimescaleDB +5. WHEN data is written THEN the system SHALL ensure atomic writes with proper error handling +6. WHEN querying data THEN the system SHALL leverage TimescaleDB's time-series optimizations for fast retrieval + +### Requirement 3: Multi-Timeframe Data Storage + +**User Story:** As a trading model, I need access to multiple timeframes (1s, 1m, 5m, 15m, 1h, 1d) of candlestick data, so that I can perform multi-timeframe analysis. + +#### Acceptance Criteria + +1. WHEN storing candlestick data THEN the system SHALL store all configured timeframes (1s, 1m, 5m, 15m, 1h, 1d) +2. WHEN aggregating data THEN the system SHALL use TimescaleDB continuous aggregates to automatically generate higher timeframes from 1s data +3. WHEN requesting multi-timeframe data THEN the system SHALL return aligned timestamps across all timeframes +4. WHEN a timeframe is missing data THEN the system SHALL generate it from lower timeframes if available +5. WHEN storing timeframe data THEN the system SHALL maintain at least 1500 candles per timeframe for each symbol + +### Requirement 4: Raw Order Book and Trade Data Storage + +**User Story:** As a machine learning model, I need access to raw 1s and 1m aggregated order book and trade book data, so that I can analyze market microstructure. + +#### Acceptance Criteria + +1. WHEN receiving order book updates THEN the system SHALL store raw ticks in TimescaleDB with full bid/ask depth +2. WHEN aggregating order book data THEN the system SHALL create 1s aggregations with $1 price buckets +3. WHEN aggregating order book data THEN the system SHALL create 1m aggregations with $10 price buckets +4. WHEN storing trade data THEN the system SHALL store individual trades with price, size, side, and timestamp +5. WHEN storing order book data THEN the system SHALL maintain 30 minutes of raw data and 24 hours of aggregated data +6. WHEN querying order book data THEN the system SHALL provide efficient access to imbalance metrics across multiple timeframes (1s, 5s, 15s, 60s) + +### Requirement 5: Real-Time Data Caching + +**User Story:** As a real-time trading system, I need low-latency access to the latest market data, so that I can make timely trading decisions. + +#### Acceptance Criteria + +1. WHEN receiving real-time data THEN the system SHALL maintain an in-memory cache of the last 5 minutes of data +2. WHEN requesting latest data THEN the system SHALL serve from cache with <10ms latency +3. WHEN cache is updated THEN the system SHALL asynchronously persist to TimescaleDB without blocking +4. WHEN cache reaches capacity THEN the system SHALL evict oldest data while maintaining continuity +5. WHEN system restarts THEN the system SHALL rebuild cache from TimescaleDB automatically + +### Requirement 6: Historical Data Access for Backtesting + +**User Story:** As a backtesting system, I need efficient access to historical data at any timestamp, so that I can simulate trading strategies accurately. + +#### Acceptance Criteria + +1. WHEN requesting historical data THEN the system SHALL query TimescaleDB with timestamp-based indexing +2. WHEN requesting a time range THEN the system SHALL return all data points within that range efficiently +3. WHEN requesting data with context window THEN the system SHALL return ±N minutes of surrounding data +4. WHEN backtesting THEN the system SHALL support sequential data access without loading entire dataset into memory +5. WHEN querying historical data THEN the system SHALL return results in <100ms for typical queries (single timestamp, single symbol) + +### Requirement 7: Data Annotation Support + +**User Story:** As a data annotator, I need to retrieve historical market data at specific timestamps to manually label trading signals, so that I can create training datasets. + +#### Acceptance Criteria + +1. WHEN annotating data THEN the system SHALL provide the same `get_inference_data()` interface with timestamp parameter +2. WHEN retrieving annotation data THEN the system SHALL include ±5 minutes of context data +3. WHEN loading annotation sessions THEN the system SHALL support efficient random access to any timestamp +4. WHEN displaying charts THEN the system SHALL provide multi-timeframe data aligned to the annotation timestamp +5. WHEN saving annotations THEN the system SHALL link annotations to exact timestamps in the database + +### Requirement 8: Data Migration and Backward Compatibility + +**User Story:** As a system administrator, I want existing data migrated to the new storage system without data loss, so that I can maintain historical continuity. + +#### Acceptance Criteria + +1. WHEN migration starts THEN the system SHALL detect existing Parquet files in cache directory +2. WHEN migrating Parquet data THEN the system SHALL import all data into TimescaleDB with proper timestamps +3. WHEN migration completes THEN the system SHALL verify data integrity by comparing record counts +4. WHEN migration fails THEN the system SHALL rollback changes and preserve original files +5. WHEN migration succeeds THEN the system SHALL optionally archive old Parquet files +6. WHEN accessing data during migration THEN the system SHALL continue serving from existing storage + +### Requirement 9: Performance and Scalability + +**User Story:** As a system operator, I need the data storage system to handle high-frequency data ingestion and queries efficiently, so that the system remains responsive under load. + +#### Acceptance Criteria + +1. WHEN ingesting real-time data THEN the system SHALL handle at least 1000 updates per second per symbol +2. WHEN querying data THEN the system SHALL return single-timestamp queries in <100ms +3. WHEN querying time ranges THEN the system SHALL return 1 hour of 1s data in <500ms +4. WHEN storing data THEN the system SHALL use batch writes to optimize database performance +5. WHEN database grows THEN the system SHALL use TimescaleDB compression to reduce storage size by 80%+ +6. WHEN running multiple queries THEN the system SHALL support concurrent access without performance degradation + +### Requirement 10: Data Consistency and Validation + +**User Story:** As a trading system, I need to ensure all data is consistent and validated, so that models receive accurate information. + +#### Acceptance Criteria + +1. WHEN storing data THEN the system SHALL validate timestamps are in UTC timezone +2. WHEN storing OHLCV data THEN the system SHALL validate high >= low and high >= open/close +3. WHEN storing order book data THEN the system SHALL validate bids < asks +4. WHEN detecting invalid data THEN the system SHALL log warnings and reject the data point +5. WHEN querying data THEN the system SHALL ensure all timeframes are properly aligned +6. WHEN data gaps exist THEN the system SHALL identify and log missing periods diff --git a/.kiro/specs/unified-data-storage/tasks.md b/.kiro/specs/unified-data-storage/tasks.md new file mode 100644 index 0000000..9490dd2 --- /dev/null +++ b/.kiro/specs/unified-data-storage/tasks.md @@ -0,0 +1,286 @@ +# Implementation Plan + +- [x] 1. Set up TimescaleDB schema and infrastructure + + + + - Create database schema with hypertables for OHLCV, order book, and trade data + - Implement continuous aggregates for multi-timeframe data generation + - Configure compression and retention policies + - Create all necessary indexes for query optimization + + + - _Requirements: 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 3.1, 3.2, 3.3, 3.4, 3.5, 4.1, 4.2, 4.3, 4.4, 4.5, 4.6_ + +- [ ] 2. Implement data models and validation + - [ ] 2.1 Create InferenceDataFrame and OrderBookDataFrame data classes + - Write dataclasses for standardized data structures + - Include all required fields (OHLCV, order book, imbalances, indicators) + - Add serialization/deserialization methods + - _Requirements: 1.4, 10.1, 10.2, 10.3_ + + - [ ] 2.2 Implement DataValidator class + - Write OHLCV validation logic (high >= low, positive volume) + - Write order book validation logic (bids < asks) + - Write timestamp validation and UTC timezone enforcement + - Add comprehensive error logging for validation failures + - _Requirements: 10.1, 10.2, 10.3, 10.4_ + + - [ ]* 2.3 Write unit tests for data models and validation + - Test InferenceDataFrame creation and serialization + - Test OrderBookDataFrame creation and serialization + - Test DataValidator with valid and invalid data + - Test edge cases and boundary conditions + - _Requirements: 10.1, 10.2, 10.3, 10.4_ + +- [ ] 3. Implement cache layer + - [ ] 3.1 Create DataCacheManager class + - Implement in-memory cache with deque structures + - Add methods for OHLCV, order book, and imbalance data + - Implement cache eviction logic (5-minute rolling window) + - Add cache statistics tracking (hits, misses) + - _Requirements: 5.1, 5.2, 5.3, 5.4_ + + - [ ] 3.2 Implement cache retrieval methods + - Write get_latest_ohlcv() with timeframe support + - Write get_latest_orderbook() for current snapshot + - Write get_latest_imbalances() for multi-timeframe metrics + - Ensure <10ms latency for cache reads + - _Requirements: 5.1, 5.2_ + + - [ ]* 3.3 Write unit tests for cache layer + - Test cache insertion and retrieval + - Test cache eviction logic + - Test cache statistics + - Test concurrent access patterns + - _Requirements: 5.1, 5.2, 5.3, 5.4_ + +- [ ] 4. Implement database connection and query layer + - [ ] 4.1 Create DatabaseConnectionManager class + - Implement asyncpg connection pool management + - Add health monitoring and automatic reconnection + - Configure connection pool settings (min/max connections) + - Add connection statistics and logging + - _Requirements: 2.1, 2.5, 9.6_ + + - [ ] 4.2 Implement OHLCV query methods + - Write query_ohlcv_data() for single timeframe retrieval + - Write query_multi_timeframe_ohlcv() for aligned multi-timeframe data + - Optimize queries with time_bucket and proper indexes + - Ensure <100ms query latency for typical queries + - _Requirements: 3.1, 3.2, 3.3, 3.4, 6.1, 6.2, 6.5, 9.2, 9.3_ + + - [ ] 4.3 Implement order book query methods + - Write query_orderbook_snapshots() for raw order book data + - Write query_orderbook_aggregated() for 1s/1m aggregations + - Write query_orderbook_imbalances() for multi-timeframe imbalances + - Optimize queries for fast retrieval + - _Requirements: 4.1, 4.2, 4.3, 4.6, 6.1, 6.2, 6.5_ + + - [ ]* 4.4 Write integration tests for database layer + - Test connection pool management + - Test OHLCV queries with various time ranges + - Test order book queries + - Test query performance and latency + - _Requirements: 6.1, 6.2, 6.5, 9.2, 9.3_ + +- [ ] 5. Implement data ingestion pipeline + - [ ] 5.1 Create DataIngestionPipeline class + - Implement batch write buffers for OHLCV, order book, and trade data + - Add batch size and timeout configuration + - Implement async batch flush methods + - Add error handling and retry logic + - _Requirements: 2.5, 5.3, 9.1, 9.4_ + + - [ ] 5.2 Implement OHLCV ingestion + - Write ingest_ohlcv_candle() method + - Add immediate cache write + - Implement batch buffering for database writes + - Add data validation before ingestion + - _Requirements: 2.1, 2.2, 2.5, 5.1, 5.3, 9.1, 9.4, 10.1, 10.2_ + + - [ ] 5.3 Implement order book ingestion + - Write ingest_orderbook_snapshot() method + - Calculate and cache imbalance metrics + - Implement batch buffering for database writes + - Add data validation before ingestion + - _Requirements: 2.1, 2.2, 4.1, 4.2, 4.3, 5.1, 5.3, 9.1, 9.4, 10.3_ + + - [ ] 5.4 Implement retry logic and error handling + - Create RetryableDBOperation wrapper class + - Implement exponential backoff retry strategy + - Add comprehensive error logging + - Handle database connection failures gracefully + - _Requirements: 2.5, 9.6_ + + - [ ]* 5.5 Write integration tests for ingestion pipeline + - Test OHLCV ingestion flow (cache → database) + - Test order book ingestion flow + - Test batch write operations + - Test error handling and retry logic + - _Requirements: 2.5, 5.3, 9.1, 9.4_ + +- [ ] 6. Implement unified data provider API + - [ ] 6.1 Create UnifiedDataProvider class + - Initialize with database connection pool and cache manager + - Configure symbols and timeframes + - Add connection to existing DataProvider components + - _Requirements: 1.1, 1.2, 1.3_ + + - [ ] 6.2 Implement get_inference_data() method + - Handle timestamp=None for real-time data from cache + - Handle specific timestamp for historical data from database + - Implement context window retrieval (±N minutes) + - Combine OHLCV, order book, and imbalance data + - Return standardized InferenceDataFrame + - _Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 5.2, 6.1, 6.2, 6.3, 6.4, 7.1, 7.2, 7.3_ + + - [ ] 6.3 Implement get_multi_timeframe_data() method + - Query multiple timeframes efficiently + - Align timestamps across timeframes + - Handle missing data by generating from lower timeframes + - Return dictionary mapping timeframe to DataFrame + - _Requirements: 3.1, 3.2, 3.3, 3.4, 6.1, 6.2, 6.3, 10.5_ + + - [ ] 6.4 Implement get_order_book_data() method + - Handle different aggregation levels (raw, 1s, 1m) + - Include multi-timeframe imbalance metrics + - Return standardized OrderBookDataFrame + - _Requirements: 4.1, 4.2, 4.3, 4.6, 6.1, 6.2_ + + - [ ]* 6.5 Write integration tests for unified API + - Test get_inference_data() with real-time and historical data + - Test get_multi_timeframe_data() with various timeframes + - Test get_order_book_data() with different aggregations + - Test context window retrieval + - Test data consistency across methods + - _Requirements: 1.1, 1.2, 1.3, 1.4, 1.5, 6.1, 6.2, 6.3, 6.4, 10.5, 10.6_ + +- [ ] 7. Implement data migration system + - [ ] 7.1 Create DataMigrationManager class + - Initialize with database connection and cache directory path + - Add methods for discovering existing Parquet files + - Implement symbol format conversion utilities + - _Requirements: 8.1, 8.2, 8.6_ + + - [ ] 7.2 Implement Parquet file migration + - Write _migrate_ohlcv_data() to process all Parquet files + - Parse filenames to extract symbol and timeframe + - Read Parquet files and convert to database format + - Implement batch insertion with conflict handling + - _Requirements: 8.1, 8.2, 8.3, 8.5_ + + - [ ] 7.3 Implement migration verification + - Write _verify_migration() to compare record counts + - Check data integrity (no missing timestamps) + - Validate data ranges match original files + - Generate migration report + - _Requirements: 8.3, 8.4_ + + - [ ] 7.4 Implement rollback capability + - Add transaction support for migration operations + - Implement rollback on verification failure + - Preserve original Parquet files until verification passes + - Add option to archive old files after successful migration + - _Requirements: 8.4, 8.5_ + + - [ ]* 7.5 Write integration tests for migration + - Test Parquet file discovery and parsing + - Test data migration with sample files + - Test verification logic + - Test rollback on failure + - _Requirements: 8.1, 8.2, 8.3, 8.4_ + +- [ ] 8. Integrate with existing DataProvider + - [ ] 8.1 Update DataProvider class to use UnifiedDataProvider + - Replace existing data retrieval methods with unified API calls + - Update get_data() method to use get_inference_data() + - Update multi-timeframe methods to use get_multi_timeframe_data() + - Maintain backward compatibility with existing interfaces + - _Requirements: 1.1, 1.2, 1.3, 8.6_ + + - [ ] 8.2 Update real-time data flow + - Connect WebSocket data to DataIngestionPipeline + - Update tick aggregator to write to cache and database + - Update COB integration to use new ingestion methods + - Ensure no data loss during transition + - _Requirements: 2.1, 2.2, 5.1, 5.3, 8.6_ + + - [ ] 8.3 Update annotation system integration + - Update ANNOTATE/core/data_loader.py to use unified API + - Ensure annotation system uses get_inference_data() with timestamps + - Test annotation workflow with new data provider + - _Requirements: 7.1, 7.2, 7.3, 7.4, 7.5_ + + - [ ] 8.4 Update backtesting system integration + - Update backtesting data access to use unified API + - Ensure sequential data access works efficiently + - Test backtesting performance with new data provider + - _Requirements: 6.1, 6.2, 6.3, 6.4, 6.5_ + + - [ ]* 8.5 Write end-to-end integration tests + - Test complete data flow: WebSocket → ingestion → cache → database → retrieval + - Test annotation system with unified data provider + - Test backtesting system with unified data provider + - Test real-time trading with unified data provider + - _Requirements: 1.1, 1.2, 1.3, 6.1, 6.2, 7.1, 8.6_ + +- [ ] 9. Performance optimization and monitoring + - [ ] 9.1 Implement performance monitoring + - Add latency tracking for cache reads (<10ms target) + - Add latency tracking for database queries (<100ms target) + - Add throughput monitoring for ingestion (>1000 ops/sec target) + - Create performance dashboard or logging + - _Requirements: 5.2, 6.5, 9.1, 9.2, 9.3_ + + - [ ] 9.2 Optimize database queries + - Analyze query execution plans + - Add missing indexes if needed + - Optimize time_bucket usage + - Implement query result caching where appropriate + - _Requirements: 6.5, 9.2, 9.3, 9.6_ + + - [ ] 9.3 Implement compression and retention + - Verify compression policies are working (>80% compression target) + - Monitor storage growth over time + - Verify retention policies are cleaning old data + - Add alerts for storage issues + - _Requirements: 2.6, 9.5_ + + - [ ]* 9.4 Write performance tests + - Test cache read latency under load + - Test database query latency with various time ranges + - Test ingestion throughput with high-frequency data + - Test concurrent access patterns + - _Requirements: 5.2, 6.5, 9.1, 9.2, 9.3, 9.6_ + +- [ ] 10. Documentation and deployment + - [ ] 10.1 Create deployment documentation + - Document TimescaleDB setup and configuration + - Document migration process and steps + - Document rollback procedures + - Create troubleshooting guide + - _Requirements: 8.1, 8.2, 8.3, 8.4, 8.5, 8.6_ + + - [ ] 10.2 Create API documentation + - Document UnifiedDataProvider API methods + - Provide usage examples for each method + - Document data models and structures + - Create migration guide for existing code + - _Requirements: 1.1, 1.2, 1.3, 1.4, 1.5_ + + - [ ] 10.3 Create monitoring and alerting setup + - Document key metrics to monitor + - Set up alerts for performance degradation + - Set up alerts for data validation failures + - Create operational runbook + - _Requirements: 9.1, 9.2, 9.3, 9.5, 9.6, 10.4_ + + - [ ] 10.4 Execute phased deployment + - Phase 1: Deploy with dual-write (Parquet + TimescaleDB) + - Phase 2: Run migration script for historical data + - Phase 3: Verify data integrity + - Phase 4: Switch reads to TimescaleDB + - Phase 5: Deprecate Parquet writes + - Phase 6: Archive old Parquet files + - _Requirements: 8.1, 8.2, 8.3, 8.4, 8.5, 8.6_ diff --git a/ANNOTATE/REALTIME_INFERENCE_GUIDE.md b/ANNOTATE/REALTIME_INFERENCE_GUIDE.md new file mode 100644 index 0000000..a402ddc --- /dev/null +++ b/ANNOTATE/REALTIME_INFERENCE_GUIDE.md @@ -0,0 +1,285 @@ +# Real-Time Inference Guide + +## 🎯 Overview + +Real-time inference mode runs your trained model on **live streaming data** from the DataProvider, updating charts every second and displaying model predictions in real-time. + +--- + +## 🚀 Starting Real-Time Inference + +### Step 1: Select Model +Choose the model you want to run from the dropdown in the training panel. + +### Step 2: Click "Start Live Inference" +- Button turns red: "Stop Inference" +- Live mode banner appears at top +- Charts begin updating every second +- Model predictions displayed + +### Visual Indicators +- **🔴 LIVE banner** at top of page +- **Green status box** in training panel +- **Update counter** showing number of updates +- **Signal markers** on charts (🔵 BUY, 🔴 SELL) + +--- + +## 📊 What Updates in Real-Time + +### Charts (Every 1 Second) +- **All 4 timeframes** update with latest data +- **Candlesticks** show new price action +- **Volume bars** update with new volume +- **Smooth updates** without page refresh + +### Model Signals +- **Latest prediction** displayed (BUY/SELL/HOLD) +- **Confidence level** shown as percentage +- **Signal markers** added to charts +- **Last 10 signals** kept visible + +### Data Source +- Uses **DataProvider's cached data** +- Same data as main trading system +- Updates from exchange feeds +- 1-second resolution + +--- + +## 🎨 Visual Elements + +### Live Mode Banner +``` +🔴 LIVE | Real-Time Inference Active +Charts updating with live data every second +[X updates] +``` + +### Signal Markers on Charts +- **🔵 BUY** - Green marker with arrow +- **🔴 SELL** - Red marker with arrow +- **Timestamp** - When signal was generated +- **Price** - Price at signal time + +### Training Panel Status +``` +🔴 LIVE +Signal: BUY +Confidence: 75.3% +Charts updating every 1s +``` + +--- + +## 🛑 Stopping Real-Time Inference + +### Click "Stop Inference" +- Live mode banner disappears +- Charts stop updating +- Signal markers remain visible +- Can review final signals + +### What Happens +- Inference loop terminates +- Chart updates stop +- Last 100 signals saved +- Model remains loaded + +--- + +## 📈 Monitoring Performance + +### Watch For +- **Signal frequency** - How often model signals +- **Confidence levels** - Higher is better (>70%) +- **Signal accuracy** - Do signals make sense? +- **False positives** - Signals that shouldn't happen + +### Good Signs +- ✅ Signals at key levels (support/resistance) +- ✅ High confidence (>70%) +- ✅ Signals match your analysis +- ✅ Few false positives + +### Warning Signs +- ⚠️ Too many signals (every second) +- ⚠️ Low confidence (<50%) +- ⚠️ Random signals +- ⚠️ Signals don't match patterns + +--- + +## 🔧 Technical Details + +### Update Frequency +- **Charts**: 1 second +- **Signals**: 1 second +- **Model inference**: 1 second + +### Data Flow +``` +DataProvider (Live Data) + ↓ +Latest Market State (4 timeframes) + ↓ +Model Inference + ↓ +Prediction (Action + Confidence) + ↓ +Update Charts + Display Signal +``` + +### Performance +- **Latency**: ~100-200ms per update +- **CPU Usage**: Moderate (model inference) +- **Memory**: Stable (no leaks) +- **Network**: Minimal (uses cached data) + +--- + +## 💡 Tips & Tricks + +### Tip 1: Watch Multiple Timeframes +All 4 charts update simultaneously. Watch for: +- Alignment across timeframes +- Divergences between timeframes +- Pattern confirmation + +### Tip 2: Monitor Confidence +- **>80%**: Very strong signal +- **70-80%**: Strong signal +- **50-70%**: Moderate signal +- **<50%**: Weak signal (ignore) + +### Tip 3: Compare with Annotations +- Do live signals match your annotations? +- Are signals at similar price levels? +- Is timing similar to your trades? + +### Tip 4: Test Different Models +- Try CNN vs DQN vs Transformer +- Compare signal quality +- Note which performs best + +### Tip 5: Use for Validation +- After training, test with live inference +- Verify model learned correctly +- Check for overfitting + +--- + +## 🐛 Troubleshooting + +### Charts Not Updating +**Issue**: Live mode active but charts frozen + +**Solutions**: +- Check browser console for errors +- Verify DataProvider has live data +- Refresh page and restart inference +- Check network tab for failed requests + +### No Signals Generated +**Issue**: Status shows "HOLD" constantly + +**Solutions**: +- Model may need more training +- Check model is loaded correctly +- Verify market conditions (model may correctly hold) +- Try different model + +### Signals Too Frequent +**Issue**: Signal every second + +**Solutions**: +- Model may be overtrained +- Need more negative examples in training +- Adjust confidence threshold +- Retrain with better annotations + +### Performance Issues +**Issue**: Browser slow/laggy + +**Solutions**: +- Close other tabs +- Reduce number of visible timeframes +- Stop inference when not needed +- Clear browser cache + +--- + +## 📊 Example Session + +### Scenario: Testing CNN After Training + +**1. Preparation** +- Trained CNN on 20 breakout annotations +- Model learned breakout patterns +- Ready to test on live data + +**2. Start Inference** +- Select "StandardizedCNN" +- Click "Start Live Inference" +- 🔴 LIVE banner appears +- Charts begin updating + +**3. Observation (5 minutes)** +- Charts update smoothly +- Model generates 2 BUY signals +- Both at resistance breakouts +- Confidence: 78% and 82% + +**4. Validation** +- Signals match training patterns +- Timing is precise +- No false positives +- Model learned correctly ✅ + +**5. Stop Inference** +- Click "Stop Inference" +- Review signal history +- Model performs well +- Ready for production + +--- + +## 🎯 Best Practices + +### Before Starting +- ✅ Train model first +- ✅ Verify model loaded +- ✅ Check DataProvider has data +- ✅ Close unnecessary tabs + +### During Inference +- ✅ Monitor all timeframes +- ✅ Note signal quality +- ✅ Check confidence levels +- ✅ Compare with your analysis + +### After Stopping +- ✅ Review signal history +- ✅ Note performance +- ✅ Identify improvements +- ✅ Adjust training if needed + +--- + +## 🚀 Summary + +Real-time inference provides: + +✅ **Live chart updates** (1/second) +✅ **Model predictions** in real-time +✅ **Signal markers** on charts +✅ **Confidence levels** displayed +✅ **Performance monitoring** built-in + +Use it to: +- **Validate training** - Check model learned correctly +- **Test models** - Compare different models +- **Monitor performance** - Track signal quality +- **Debug issues** - Identify problems + +**Result**: Confidence that your model works correctly before deploying to production! 🎯 diff --git a/ANNOTATE/web/templates/components/training_panel.html b/ANNOTATE/web/templates/components/training_panel.html index 62a995a..6e4d733 100644 --- a/ANNOTATE/web/templates/components/training_panel.html +++ b/ANNOTATE/web/templates/components/training_panel.html @@ -13,7 +13,7 @@ - +
- + - + - +
@@ -71,7 +69,7 @@ Stop Inference
- + - +
@@ -111,7 +109,7 @@ .then(data => { const modelSelect = document.getElementById('model-select'); modelSelect.innerHTML = ''; - + if (data.success && data.models.length > 0) { data.models.forEach(model => { const option = document.createElement('option'); @@ -132,202 +130,214 @@ modelSelect.innerHTML = ''; }); } - + // Load models when page loads if (document.readyState === 'loading') { document.addEventListener('DOMContentLoaded', loadAvailableModels); } else { loadAvailableModels(); } - + // Train model button - document.getElementById('train-model-btn').addEventListener('click', function() { + document.getElementById('train-model-btn').addEventListener('click', function () { const modelName = document.getElementById('model-select').value; - + if (appState.annotations.length === 0) { showError('No annotations available for training'); return; } - + // Get annotation IDs const annotationIds = appState.annotations.map(a => a.annotation_id); - + // Start training startTraining(modelName, annotationIds); }); - + function startTraining(modelName, annotationIds) { // Show training status document.getElementById('training-status').style.display = 'block'; document.getElementById('training-results').style.display = 'none'; document.getElementById('train-model-btn').disabled = true; - + // Reset progress document.getElementById('training-progress-bar').style.width = '0%'; document.getElementById('training-epoch').textContent = '0'; document.getElementById('training-loss').textContent = '--'; - + // Start training request fetch('/api/train-model', { method: 'POST', - headers: {'Content-Type': 'application/json'}, + headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model_name: modelName, annotation_ids: annotationIds }) }) - .then(response => response.json()) - .then(data => { - if (data.success) { - // Start polling for training progress - pollTrainingProgress(data.training_id); - } else { - showError('Failed to start training: ' + data.error.message); - document.getElementById('training-status').style.display = 'none'; - document.getElementById('train-model-btn').disabled = false; - } - }) - .catch(error => { - showError('Network error: ' + error.message); - document.getElementById('training-status').style.display = 'none'; - document.getElementById('train-model-btn').disabled = false; - }); - } - - function pollTrainingProgress(trainingId) { - const pollInterval = setInterval(function() { - fetch('/api/training-progress', { - method: 'POST', - headers: {'Content-Type': 'application/json'}, - body: JSON.stringify({training_id: trainingId}) - }) .then(response => response.json()) .then(data => { if (data.success) { - const progress = data.progress; - - // Update progress bar - const percentage = (progress.current_epoch / progress.total_epochs) * 100; - document.getElementById('training-progress-bar').style.width = percentage + '%'; - document.getElementById('training-epoch').textContent = progress.current_epoch; - document.getElementById('training-total-epochs').textContent = progress.total_epochs; - document.getElementById('training-loss').textContent = progress.current_loss.toFixed(4); - - // Check if complete - if (progress.status === 'completed') { - clearInterval(pollInterval); - showTrainingResults(progress); - } else if (progress.status === 'failed') { - clearInterval(pollInterval); - showError('Training failed: ' + progress.error); - document.getElementById('training-status').style.display = 'none'; - document.getElementById('train-model-btn').disabled = false; - } + // Start polling for training progress + pollTrainingProgress(data.training_id); + } else { + showError('Failed to start training: ' + data.error.message); + document.getElementById('training-status').style.display = 'none'; + document.getElementById('train-model-btn').disabled = false; } }) .catch(error => { - clearInterval(pollInterval); - showError('Failed to get training progress: ' + error.message); + showError('Network error: ' + error.message); document.getElementById('training-status').style.display = 'none'; document.getElementById('train-model-btn').disabled = false; }); + } + + function pollTrainingProgress(trainingId) { + const pollInterval = setInterval(function () { + fetch('/api/training-progress', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ training_id: trainingId }) + }) + .then(response => response.json()) + .then(data => { + if (data.success) { + const progress = data.progress; + + // Update progress bar + const percentage = (progress.current_epoch / progress.total_epochs) * 100; + document.getElementById('training-progress-bar').style.width = percentage + '%'; + document.getElementById('training-epoch').textContent = progress.current_epoch; + document.getElementById('training-total-epochs').textContent = progress.total_epochs; + document.getElementById('training-loss').textContent = progress.current_loss.toFixed(4); + + // Check if complete + if (progress.status === 'completed') { + clearInterval(pollInterval); + showTrainingResults(progress); + } else if (progress.status === 'failed') { + clearInterval(pollInterval); + showError('Training failed: ' + progress.error); + document.getElementById('training-status').style.display = 'none'; + document.getElementById('train-model-btn').disabled = false; + } + } + }) + .catch(error => { + clearInterval(pollInterval); + showError('Failed to get training progress: ' + error.message); + document.getElementById('training-status').style.display = 'none'; + document.getElementById('train-model-btn').disabled = false; + }); }, 1000); // Poll every second } - + function showTrainingResults(results) { // Hide training status document.getElementById('training-status').style.display = 'none'; - + // Show results document.getElementById('training-results').style.display = 'block'; document.getElementById('result-loss').textContent = results.final_loss.toFixed(4); document.getElementById('result-accuracy').textContent = (results.accuracy * 100).toFixed(2) + '%'; document.getElementById('result-duration').textContent = results.duration_seconds.toFixed(1) + 's'; - + // Update last training time document.getElementById('last-training-time').textContent = new Date().toLocaleTimeString(); - + // Re-enable train button document.getElementById('train-model-btn').disabled = false; - + showSuccess('Training completed successfully'); } - + // Real-time inference controls let currentInferenceId = null; let signalPollInterval = null; - - document.getElementById('start-inference-btn').addEventListener('click', function() { + + document.getElementById('start-inference-btn').addEventListener('click', function () { const modelName = document.getElementById('model-select').value; - + if (!modelName) { showError('Please select a model first'); return; } - + // Start real-time inference fetch('/api/realtime-inference/start', { method: 'POST', - headers: {'Content-Type': 'application/json'}, + headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ model_name: modelName, symbol: appState.currentSymbol }) }) - .then(response => response.json()) - .then(data => { - if (data.success) { - currentInferenceId = data.inference_id; - - // Update UI - document.getElementById('start-inference-btn').style.display = 'none'; - document.getElementById('stop-inference-btn').style.display = 'block'; - document.getElementById('inference-status').style.display = 'block'; - - // Start polling for signals - startSignalPolling(); - - showSuccess('Real-time inference started'); - } else { - showError('Failed to start inference: ' + data.error.message); - } - }) - .catch(error => { - showError('Network error: ' + error.message); - }); + .then(response => response.json()) + .then(data => { + if (data.success) { + currentInferenceId = data.inference_id; + + // Update UI + document.getElementById('start-inference-btn').style.display = 'none'; + document.getElementById('stop-inference-btn').style.display = 'block'; + document.getElementById('inference-status').style.display = 'block'; + + // Show live mode banner + const banner = document.getElementById('live-mode-banner'); + if (banner) { + banner.style.display = 'block'; + } + + // Start polling for signals + startSignalPolling(); + + showSuccess('Real-time inference started - Charts now updating live'); + } else { + showError('Failed to start inference: ' + data.error.message); + } + }) + .catch(error => { + showError('Network error: ' + error.message); + }); }); - - document.getElementById('stop-inference-btn').addEventListener('click', function() { + + document.getElementById('stop-inference-btn').addEventListener('click', function () { if (!currentInferenceId) return; - + // Stop real-time inference fetch('/api/realtime-inference/stop', { method: 'POST', - headers: {'Content-Type': 'application/json'}, - body: JSON.stringify({inference_id: currentInferenceId}) + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ inference_id: currentInferenceId }) }) - .then(response => response.json()) - .then(data => { - if (data.success) { - // Update UI - document.getElementById('start-inference-btn').style.display = 'block'; - document.getElementById('stop-inference-btn').style.display = 'none'; - document.getElementById('inference-status').style.display = 'none'; - - // Stop polling - stopSignalPolling(); - - currentInferenceId = null; - showSuccess('Real-time inference stopped'); - } - }) - .catch(error => { - showError('Network error: ' + error.message); - }); + .then(response => response.json()) + .then(data => { + if (data.success) { + // Update UI + document.getElementById('start-inference-btn').style.display = 'block'; + document.getElementById('stop-inference-btn').style.display = 'none'; + document.getElementById('inference-status').style.display = 'none'; + + // Hide live mode banner + const banner = document.getElementById('live-mode-banner'); + if (banner) { + banner.style.display = 'none'; + } + + // Stop polling + stopSignalPolling(); + + currentInferenceId = null; + showSuccess('Real-time inference stopped'); + } + }) + .catch(error => { + showError('Network error: ' + error.message); + }); }); - + function startSignalPolling() { - signalPollInterval = setInterval(function() { + signalPollInterval = setInterval(function () { // Poll for signals fetch('/api/realtime-inference/signals') .then(response => response.json()) @@ -335,9 +345,9 @@ if (data.success && data.signals.length > 0) { const latest = data.signals[0]; document.getElementById('latest-signal').textContent = latest.action; - document.getElementById('latest-confidence').textContent = + document.getElementById('latest-confidence').textContent = (latest.confidence * 100).toFixed(1) + '%'; - + // Update chart with signal markers if (appState.chartManager) { displaySignalOnChart(latest); @@ -347,17 +357,17 @@ .catch(error => { console.error('Error polling signals:', error); }); - + // Update charts with latest data updateChartsWithLiveData(); }, 1000); // Poll every second } - + function updateChartsWithLiveData() { // Fetch latest chart data fetch('/api/chart-data', { method: 'POST', - headers: {'Content-Type': 'application/json'}, + headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ symbol: appState.currentSymbol, timeframes: appState.currentTimeframes, @@ -365,72 +375,85 @@ end_time: null }) }) - .then(response => response.json()) - .then(data => { - if (data.success && appState.chartManager) { - // Update each chart with new data - Object.keys(data.chart_data).forEach(timeframe => { - const chartData = data.chart_data[timeframe]; - if (appState.chartManager.charts[timeframe]) { - updateSingleChart(timeframe, chartData); - } - }); - } - }) - .catch(error => { - console.error('Error updating charts:', error); - }); + .then(response => response.json()) + .then(data => { + if (data.success && appState.chartManager) { + // Update each chart with new data + Object.keys(data.chart_data).forEach(timeframe => { + const chartData = data.chart_data[timeframe]; + if (appState.chartManager.charts[timeframe]) { + updateSingleChart(timeframe, chartData); + } + }); + } + }) + .catch(error => { + console.error('Error updating charts:', error); + }); } - + + let liveUpdateCount = 0; + function updateSingleChart(timeframe, newData) { const chart = appState.chartManager.charts[timeframe]; if (!chart) return; - - // Update candlestick data - Plotly.update(chart.plotId, { - x: [newData.timestamps], - open: [newData.open], - high: [newData.high], - low: [newData.low], - close: [newData.close] - }, {}, [0]); - - // Update volume data - const volumeColors = newData.close.map((close, i) => { - if (i === 0) return '#3b82f6'; - return close >= newData.open[i] ? '#10b981' : '#ef4444'; - }); - - Plotly.update(chart.plotId, { - x: [newData.timestamps], - y: [newData.volume], - 'marker.color': [volumeColors] - }, {}, [1]); + + try { + // Update candlestick data + Plotly.update(chart.plotId, { + x: [newData.timestamps], + open: [newData.open], + high: [newData.high], + low: [newData.low], + close: [newData.close] + }, {}, [0]); + + // Update volume data + const volumeColors = newData.close.map((close, i) => { + if (i === 0) return '#3b82f6'; + return close >= newData.open[i] ? '#10b981' : '#ef4444'; + }); + + Plotly.update(chart.plotId, { + x: [newData.timestamps], + y: [newData.volume], + 'marker.color': [volumeColors] + }, {}, [1]); + + // Update counter + liveUpdateCount++; + const counterEl = document.getElementById('live-update-count'); + if (counterEl) { + counterEl.textContent = liveUpdateCount + ' updates'; + } + } catch (error) { + console.error('Error updating chart:', timeframe, error); + } } - + function stopSignalPolling() { if (signalPollInterval) { clearInterval(signalPollInterval); signalPollInterval = null; } } - + function displaySignalOnChart(signal) { // Add signal marker to chart if (!appState.chartManager || !appState.chartManager.charts) return; - + // Add marker to all timeframe charts Object.keys(appState.chartManager.charts).forEach(timeframe => { const chart = appState.chartManager.charts[timeframe]; if (!chart) return; - + // Get current annotations const currentAnnotations = chart.element.layout.annotations || []; - + // Determine marker based on signal let markerText = ''; let markerColor = '#9ca3af'; - + if (signal.action === 'BUY') { markerText = '🔵 BUY'; markerColor = '#10b981'; @@ -440,7 +463,7 @@ } else { return; // Don't show HOLD signals } - + // Add new signal marker const newAnnotation = { x: signal.timestamp, @@ -460,25 +483,25 @@ borderpad: 4, opacity: 0.8 }; - + // Keep only last 10 signal markers - const signalAnnotations = currentAnnotations.filter(ann => + const signalAnnotations = currentAnnotations.filter(ann => ann.text && (ann.text.includes('BUY') || ann.text.includes('SELL')) ).slice(-9); - + // Combine with existing non-signal annotations - const otherAnnotations = currentAnnotations.filter(ann => + const otherAnnotations = currentAnnotations.filter(ann => !ann.text || (!ann.text.includes('BUY') && !ann.text.includes('SELL')) ); - + const allAnnotations = [...otherAnnotations, ...signalAnnotations, newAnnotation]; - + // Update chart Plotly.relayout(chart.plotId, { annotations: allAnnotations }); }); - + console.log('Signal displayed:', signal.action, '@', signal.price); } - + \ No newline at end of file diff --git a/core/unified_data_models.py b/core/unified_data_models.py new file mode 100644 index 0000000..e69de29 diff --git a/core/unified_storage_schema.py b/core/unified_storage_schema.py new file mode 100644 index 0000000..f7a0c0b --- /dev/null +++ b/core/unified_storage_schema.py @@ -0,0 +1,553 @@ +""" +Unified Data Storage Schema Manager +Extends COBY schema with additional tables for unified data storage system. +""" + +import logging +from typing import Dict, List, Optional +from datetime import datetime +import asyncpg + +logger = logging.getLogger(__name__) + + +class UnifiedStorageSchemaManager: + """ + Manages unified data storage schema with TimescaleDB. + Extends existing COBY schema with additional tables for: + - Enhanced OHLCV with technical indicators + - Order book 1s aggregations with price buckets + - Multi-timeframe order book imbalances + """ + + def __init__(self, connection_pool): + self.pool = connection_pool + self.current_version = "2.0.0" + + async def initialize_schema_tracking(self) -> None: + """Initialize schema version tracking table.""" + try: + async with self.pool.acquire() as conn: + await conn.execute(""" + CREATE TABLE IF NOT EXISTS schema_migrations ( + version VARCHAR(20) PRIMARY KEY, + applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + description TEXT, + checksum VARCHAR(64) + ); + """) + + logger.info("Schema tracking initialized") + + except Exception as e: + logger.error(f"Failed to initialize schema tracking: {e}") + raise + + async def apply_migration(self, version: str, description: str, sql_commands: List[str]) -> bool: + """Apply a database migration.""" + try: + async with self.pool.acquire() as conn: + async with conn.transaction(): + # Check if migration already applied + existing = await conn.fetchval(""" + SELECT version FROM schema_migrations WHERE version = $1 + """, version) + + if existing: + logger.info(f"Migration {version} already applied") + return True + + # Apply migration commands + for sql_command in sql_commands: + try: + await conn.execute(sql_command) + except Exception as cmd_error: + logger.error(f"Error executing command: {sql_command[:100]}... Error: {cmd_error}") + raise + + # Record migration + await conn.execute(""" + INSERT INTO schema_migrations (version, description) + VALUES ($1, $2) + """, version, description) + + logger.info(f"Applied migration {version}: {description}") + return True + + except Exception as e: + logger.error(f"Failed to apply migration {version}: {e}") + return False + + async def create_enhanced_ohlcv_table(self) -> bool: + """Create enhanced OHLCV table with technical indicators.""" + migration_commands = [ + """ + CREATE TABLE IF NOT EXISTS ohlcv_data ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + timeframe VARCHAR(10) NOT NULL, + open_price DECIMAL(20,8) NOT NULL, + high_price DECIMAL(20,8) NOT NULL, + low_price DECIMAL(20,8) NOT NULL, + close_price DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + trade_count INTEGER DEFAULT 0, + -- Technical Indicators (pre-calculated) + rsi_14 DECIMAL(10,4), + macd DECIMAL(20,8), + macd_signal DECIMAL(20,8), + macd_histogram DECIMAL(20,8), + bb_upper DECIMAL(20,8), + bb_middle DECIMAL(20,8), + bb_lower DECIMAL(20,8), + ema_12 DECIMAL(20,8), + ema_26 DECIMAL(20,8), + sma_20 DECIMAL(20,8), + PRIMARY KEY (timestamp, symbol, timeframe) + ); + """ + ] + + return await self.apply_migration( + "2.0.0", + "Create enhanced OHLCV table with technical indicators", + migration_commands + ) + + async def create_order_book_tables(self) -> bool: + """Create order book related tables.""" + migration_commands = [ + # Order book snapshots + """ + CREATE TABLE IF NOT EXISTS order_book_snapshots ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL DEFAULT 'binance', + bids JSONB NOT NULL, + asks JSONB NOT NULL, + mid_price DECIMAL(20,8), + spread DECIMAL(20,8), + bid_volume DECIMAL(30,8), + ask_volume DECIMAL(30,8), + sequence_id BIGINT, + PRIMARY KEY (timestamp, symbol, exchange) + ); + """, + + # Order book 1s aggregations with price buckets + """ + CREATE TABLE IF NOT EXISTS order_book_1s_agg ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + price_bucket DECIMAL(20,2) NOT NULL, + bid_volume DECIMAL(30,8) DEFAULT 0, + ask_volume DECIMAL(30,8) DEFAULT 0, + bid_count INTEGER DEFAULT 0, + ask_count INTEGER DEFAULT 0, + imbalance DECIMAL(10,6) DEFAULT 0, + PRIMARY KEY (timestamp, symbol, price_bucket) + ); + """, + + # Multi-timeframe order book imbalances + """ + CREATE TABLE IF NOT EXISTS order_book_imbalances ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + imbalance_1s DECIMAL(10,6) DEFAULT 0, + imbalance_5s DECIMAL(10,6) DEFAULT 0, + imbalance_15s DECIMAL(10,6) DEFAULT 0, + imbalance_60s DECIMAL(10,6) DEFAULT 0, + volume_imbalance_1s DECIMAL(10,6) DEFAULT 0, + volume_imbalance_5s DECIMAL(10,6) DEFAULT 0, + volume_imbalance_15s DECIMAL(10,6) DEFAULT 0, + volume_imbalance_60s DECIMAL(10,6) DEFAULT 0, + price_range DECIMAL(10,2), + PRIMARY KEY (timestamp, symbol) + ); + """ + ] + + return await self.apply_migration( + "2.0.1", + "Create order book tables (snapshots, 1s aggregations, imbalances)", + migration_commands + ) + + async def create_trade_events_table(self) -> bool: + """Create trade events table.""" + migration_commands = [ + """ + CREATE TABLE IF NOT EXISTS trade_events ( + timestamp TIMESTAMPTZ NOT NULL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL DEFAULT 'binance', + price DECIMAL(20,8) NOT NULL, + size DECIMAL(30,8) NOT NULL, + side VARCHAR(4) NOT NULL, + trade_id VARCHAR(100) NOT NULL, + is_buyer_maker BOOLEAN, + PRIMARY KEY (timestamp, symbol, exchange, trade_id) + ); + """ + ] + + return await self.apply_migration( + "2.0.2", + "Create trade events table", + migration_commands + ) + + async def create_hypertables(self) -> bool: + """Convert tables to TimescaleDB hypertables.""" + hypertable_commands = [ + "SELECT create_hypertable('ohlcv_data', 'timestamp', if_not_exists => TRUE);", + "SELECT create_hypertable('order_book_snapshots', 'timestamp', if_not_exists => TRUE);", + "SELECT create_hypertable('order_book_1s_agg', 'timestamp', if_not_exists => TRUE);", + "SELECT create_hypertable('order_book_imbalances', 'timestamp', if_not_exists => TRUE);", + "SELECT create_hypertable('trade_events', 'timestamp', if_not_exists => TRUE);" + ] + + return await self.apply_migration( + "2.0.3", + "Convert tables to TimescaleDB hypertables", + hypertable_commands + ) + + async def create_indexes(self) -> bool: + """Create performance indexes.""" + index_commands = [ + # OHLCV indexes + "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_tf_ts ON ohlcv_data (symbol, timeframe, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_ts ON ohlcv_data (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_ohlcv_tf_ts ON ohlcv_data (timeframe, timestamp DESC);", + + # Order book snapshots indexes + "CREATE INDEX IF NOT EXISTS idx_obs_symbol_ts ON order_book_snapshots (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_obs_exchange_ts ON order_book_snapshots (exchange, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_obs_symbol_exchange_ts ON order_book_snapshots (symbol, exchange, timestamp DESC);", + + # Order book 1s aggregation indexes + "CREATE INDEX IF NOT EXISTS idx_ob1s_symbol_ts ON order_book_1s_agg (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_ob1s_symbol_bucket_ts ON order_book_1s_agg (symbol, price_bucket, timestamp DESC);", + + # Order book imbalances indexes + "CREATE INDEX IF NOT EXISTS idx_obi_symbol_ts ON order_book_imbalances (symbol, timestamp DESC);", + + # Trade events indexes + "CREATE INDEX IF NOT EXISTS idx_trades_symbol_ts ON trade_events (symbol, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_exchange_ts ON trade_events (exchange, timestamp DESC);", + "CREATE INDEX IF NOT EXISTS idx_trades_symbol_side_ts ON trade_events (symbol, side, timestamp DESC);" + ] + + return await self.apply_migration( + "2.0.4", + "Create performance indexes", + index_commands + ) + + async def create_continuous_aggregates(self) -> bool: + """Create continuous aggregates for multi-timeframe data.""" + aggregate_commands = [ + # 1m OHLCV from 1s data + """ + CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1m_continuous + WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 minute', timestamp) AS timestamp, + symbol, + '1m' AS timeframe, + first(open_price, timestamp) AS open_price, + max(high_price) AS high_price, + min(low_price) AS low_price, + last(close_price, timestamp) AS close_price, + sum(volume) AS volume, + sum(trade_count) AS trade_count + FROM ohlcv_data + WHERE timeframe = '1s' + GROUP BY time_bucket('1 minute', timestamp), symbol + WITH NO DATA; + """, + + # 5m OHLCV from 1m data + """ + CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_5m_continuous + WITH (timescaledb.continuous) AS + SELECT + time_bucket('5 minutes', timestamp) AS timestamp, + symbol, + '5m' AS timeframe, + first(open_price, timestamp) AS open_price, + max(high_price) AS high_price, + min(low_price) AS low_price, + last(close_price, timestamp) AS close_price, + sum(volume) AS volume, + sum(trade_count) AS trade_count + FROM ohlcv_data + WHERE timeframe = '1m' + GROUP BY time_bucket('5 minutes', timestamp), symbol + WITH NO DATA; + """, + + # 15m OHLCV from 5m data + """ + CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_15m_continuous + WITH (timescaledb.continuous) AS + SELECT + time_bucket('15 minutes', timestamp) AS timestamp, + symbol, + '15m' AS timeframe, + first(open_price, timestamp) AS open_price, + max(high_price) AS high_price, + min(low_price) AS low_price, + last(close_price, timestamp) AS close_price, + sum(volume) AS volume, + sum(trade_count) AS trade_count + FROM ohlcv_data + WHERE timeframe = '5m' + GROUP BY time_bucket('15 minutes', timestamp), symbol + WITH NO DATA; + """, + + # 1h OHLCV from 15m data + """ + CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1h_continuous + WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 hour', timestamp) AS timestamp, + symbol, + '1h' AS timeframe, + first(open_price, timestamp) AS open_price, + max(high_price) AS high_price, + min(low_price) AS low_price, + last(close_price, timestamp) AS close_price, + sum(volume) AS volume, + sum(trade_count) AS trade_count + FROM ohlcv_data + WHERE timeframe = '15m' + GROUP BY time_bucket('1 hour', timestamp), symbol + WITH NO DATA; + """, + + # 1d OHLCV from 1h data + """ + CREATE MATERIALIZED VIEW IF NOT EXISTS ohlcv_1d_continuous + WITH (timescaledb.continuous) AS + SELECT + time_bucket('1 day', timestamp) AS timestamp, + symbol, + '1d' AS timeframe, + first(open_price, timestamp) AS open_price, + max(high_price) AS high_price, + min(low_price) AS low_price, + last(close_price, timestamp) AS close_price, + sum(volume) AS volume, + sum(trade_count) AS trade_count + FROM ohlcv_data + WHERE timeframe = '1h' + GROUP BY time_bucket('1 day', timestamp), symbol + WITH NO DATA; + """ + ] + + return await self.apply_migration( + "2.0.5", + "Create continuous aggregates for multi-timeframe OHLCV", + aggregate_commands + ) + + async def setup_compression_policies(self) -> bool: + """Set up compression policies for efficient storage.""" + compression_commands = [ + # Compress OHLCV data older than 7 days + "SELECT add_compression_policy('ohlcv_data', INTERVAL '7 days', if_not_exists => TRUE);", + + # Compress order book snapshots older than 1 day + "SELECT add_compression_policy('order_book_snapshots', INTERVAL '1 day', if_not_exists => TRUE);", + + # Compress order book 1s aggregations older than 2 days + "SELECT add_compression_policy('order_book_1s_agg', INTERVAL '2 days', if_not_exists => TRUE);", + + # Compress order book imbalances older than 2 days + "SELECT add_compression_policy('order_book_imbalances', INTERVAL '2 days', if_not_exists => TRUE);", + + # Compress trade events older than 7 days + "SELECT add_compression_policy('trade_events', INTERVAL '7 days', if_not_exists => TRUE);" + ] + + return await self.apply_migration( + "2.0.6", + "Setup compression policies", + compression_commands + ) + + async def setup_retention_policies(self) -> bool: + """Set up data retention policies.""" + retention_commands = [ + # Retain OHLCV data for 2 years + "SELECT add_retention_policy('ohlcv_data', INTERVAL '2 years', if_not_exists => TRUE);", + + # Retain order book snapshots for 30 days + "SELECT add_retention_policy('order_book_snapshots', INTERVAL '30 days', if_not_exists => TRUE);", + + # Retain order book 1s aggregations for 60 days + "SELECT add_retention_policy('order_book_1s_agg', INTERVAL '60 days', if_not_exists => TRUE);", + + # Retain order book imbalances for 60 days + "SELECT add_retention_policy('order_book_imbalances', INTERVAL '60 days', if_not_exists => TRUE);", + + # Retain trade events for 90 days + "SELECT add_retention_policy('trade_events', INTERVAL '90 days', if_not_exists => TRUE);" + ] + + return await self.apply_migration( + "2.0.7", + "Setup retention policies", + retention_commands + ) + + async def setup_complete_schema(self) -> bool: + """Set up the complete unified storage schema.""" + try: + logger.info("Setting up unified storage schema...") + + # Initialize schema tracking + await self.initialize_schema_tracking() + + # Apply all migrations in order + migrations = [ + ("Enhanced OHLCV table", self.create_enhanced_ohlcv_table), + ("Order book tables", self.create_order_book_tables), + ("Trade events table", self.create_trade_events_table), + ("Hypertables", self.create_hypertables), + ("Indexes", self.create_indexes), + ("Continuous aggregates", self.create_continuous_aggregates), + ("Compression policies", self.setup_compression_policies), + ("Retention policies", self.setup_retention_policies), + ] + + for name, migration_func in migrations: + logger.info(f"Applying migration: {name}") + success = await migration_func() + if not success: + logger.error(f"Failed to apply migration: {name}") + return False + logger.info(f"Successfully applied migration: {name}") + + logger.info("Complete unified storage schema setup successful") + return True + + except Exception as e: + logger.error(f"Failed to setup complete schema: {e}") + return False + + async def get_schema_info(self) -> Dict: + """Get information about the current schema state.""" + try: + async with self.pool.acquire() as conn: + # Get applied migrations + migrations = await conn.fetch(""" + SELECT version, applied_at, description + FROM schema_migrations + ORDER BY applied_at + """) + + # Get table information + tables = await conn.fetch(""" + SELECT + schemaname, + tablename, + pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size + FROM pg_tables + WHERE schemaname = 'public' + AND tablename IN ( + 'ohlcv_data', + 'order_book_snapshots', + 'order_book_1s_agg', + 'order_book_imbalances', + 'trade_events' + ) + ORDER BY tablename + """) + + # Get hypertable information + hypertables = await conn.fetch(""" + SELECT + hypertable_name, + num_chunks, + compression_enabled, + pg_size_pretty(total_bytes) as total_size, + pg_size_pretty(compressed_total_bytes) as compressed_size + FROM timescaledb_information.hypertables + WHERE hypertable_schema = 'public' + ORDER BY hypertable_name + """) + + # Get continuous aggregates + continuous_aggs = await conn.fetch(""" + SELECT + view_name, + materialization_hypertable_name, + pg_size_pretty(total_bytes) as size + FROM timescaledb_information.continuous_aggregates + WHERE view_schema = 'public' + ORDER BY view_name + """) + + return { + "migrations": [dict(m) for m in migrations], + "tables": [dict(t) for t in tables], + "hypertables": [dict(h) for h in hypertables], + "continuous_aggregates": [dict(c) for c in continuous_aggs] + } + + except Exception as e: + logger.error(f"Failed to get schema info: {e}") + return {} + + async def verify_schema(self) -> bool: + """Verify that all required tables and indexes exist.""" + try: + async with self.pool.acquire() as conn: + # Check required tables + required_tables = [ + 'ohlcv_data', + 'order_book_snapshots', + 'order_book_1s_agg', + 'order_book_imbalances', + 'trade_events' + ] + + for table in required_tables: + exists = await conn.fetchval(""" + SELECT EXISTS ( + SELECT FROM pg_tables + WHERE schemaname = 'public' + AND tablename = $1 + ) + """, table) + + if not exists: + logger.error(f"Required table missing: {table}") + return False + + # Check hypertables + for table in required_tables: + is_hypertable = await conn.fetchval(""" + SELECT EXISTS ( + SELECT FROM timescaledb_information.hypertables + WHERE hypertable_schema = 'public' + AND hypertable_name = $1 + ) + """, table) + + if not is_hypertable: + logger.error(f"Table is not a hypertable: {table}") + return False + + logger.info("Schema verification successful") + return True + + except Exception as e: + logger.error(f"Schema verification failed: {e}") + return False diff --git a/docs/UNIFIED_STORAGE_SETUP.md b/docs/UNIFIED_STORAGE_SETUP.md new file mode 100644 index 0000000..c85293f --- /dev/null +++ b/docs/UNIFIED_STORAGE_SETUP.md @@ -0,0 +1,337 @@ +# Unified Data Storage Setup Guide + +## Overview + +The unified data storage system consolidates all market data storage into a single TimescaleDB backend, replacing fragmented Parquet files, pickle files, and in-memory caches. + +## Prerequisites + +### 1. PostgreSQL with TimescaleDB + +You need PostgreSQL 12+ with TimescaleDB extension installed. + +#### Installation Options + +**Option A: Docker (Recommended)** +```bash +docker run -d --name timescaledb \ + -p 5432:5432 \ + -e POSTGRES_PASSWORD=postgres \ + -e POSTGRES_DB=trading_data \ + timescale/timescaledb:latest-pg14 +``` + +**Option B: Local Installation** +- Follow TimescaleDB installation guide: https://docs.timescale.com/install/latest/ +- Create database: `createdb trading_data` + +### 2. Python Dependencies + +Ensure you have the required Python packages: +```bash +pip install asyncpg +``` + +## Database Configuration + +Update your `config.yaml` with database connection details: + +```yaml +database: + host: localhost + port: 5432 + name: trading_data + user: postgres + password: postgres + pool_size: 20 +``` + +## Setup Process + +### Step 1: Run Setup Script + +```bash +python scripts/setup_unified_storage.py +``` + +This script will: +1. Connect to the database +2. Verify TimescaleDB extension +3. Create all required tables +4. Convert tables to hypertables +5. Create indexes for performance +6. Set up continuous aggregates +7. Configure compression policies +8. Configure retention policies +9. Verify the setup +10. Run basic operation tests + +### Step 2: Verify Setup + +The setup script will display schema information: + +``` +=== Schema Information === +Migrations applied: 8 +Tables created: 5 +Hypertables: 5 +Continuous aggregates: 5 + +=== Table Sizes === + ohlcv_data: 8192 bytes + order_book_snapshots: 8192 bytes + order_book_1s_agg: 8192 bytes + order_book_imbalances: 8192 bytes + trade_events: 8192 bytes + +=== Hypertables === + ohlcv_data: 0 chunks, compression=enabled + order_book_snapshots: 0 chunks, compression=enabled + order_book_1s_agg: 0 chunks, compression=enabled + order_book_imbalances: 0 chunks, compression=enabled + trade_events: 0 chunks, compression=enabled + +=== Continuous Aggregates === + ohlcv_1m_continuous: 8192 bytes + ohlcv_5m_continuous: 8192 bytes + ohlcv_15m_continuous: 8192 bytes + ohlcv_1h_continuous: 8192 bytes + ohlcv_1d_continuous: 8192 bytes +``` + +## Database Schema + +### Tables + +#### 1. ohlcv_data +Stores candlestick data for all timeframes with pre-calculated technical indicators. + +**Columns:** +- `timestamp` (TIMESTAMPTZ): Candle timestamp +- `symbol` (VARCHAR): Trading pair (e.g., 'ETH/USDT') +- `timeframe` (VARCHAR): Timeframe (1s, 1m, 5m, 15m, 1h, 1d) +- `open_price`, `high_price`, `low_price`, `close_price` (DECIMAL): OHLC prices +- `volume` (DECIMAL): Trading volume +- `trade_count` (INTEGER): Number of trades +- Technical indicators: `rsi_14`, `macd`, `macd_signal`, `bb_upper`, `bb_middle`, `bb_lower`, etc. + +**Primary Key:** `(timestamp, symbol, timeframe)` + +#### 2. order_book_snapshots +Stores raw order book snapshots. + +**Columns:** +- `timestamp` (TIMESTAMPTZ): Snapshot timestamp +- `symbol` (VARCHAR): Trading pair +- `exchange` (VARCHAR): Exchange name +- `bids` (JSONB): Bid levels (top 50) +- `asks` (JSONB): Ask levels (top 50) +- `mid_price`, `spread`, `bid_volume`, `ask_volume` (DECIMAL): Calculated metrics + +**Primary Key:** `(timestamp, symbol, exchange)` + +#### 3. order_book_1s_agg +Stores 1-second aggregated order book data with $1 price buckets. + +**Columns:** +- `timestamp` (TIMESTAMPTZ): Aggregation timestamp +- `symbol` (VARCHAR): Trading pair +- `price_bucket` (DECIMAL): Price bucket ($1 increments) +- `bid_volume`, `ask_volume` (DECIMAL): Aggregated volumes +- `bid_count`, `ask_count` (INTEGER): Number of orders +- `imbalance` (DECIMAL): Order book imbalance + +**Primary Key:** `(timestamp, symbol, price_bucket)` + +#### 4. order_book_imbalances +Stores multi-timeframe order book imbalance metrics. + +**Columns:** +- `timestamp` (TIMESTAMPTZ): Calculation timestamp +- `symbol` (VARCHAR): Trading pair +- `imbalance_1s`, `imbalance_5s`, `imbalance_15s`, `imbalance_60s` (DECIMAL): Imbalances +- `volume_imbalance_1s`, `volume_imbalance_5s`, etc. (DECIMAL): Volume-weighted imbalances +- `price_range` (DECIMAL): Price range used for calculation + +**Primary Key:** `(timestamp, symbol)` + +#### 5. trade_events +Stores individual trade events. + +**Columns:** +- `timestamp` (TIMESTAMPTZ): Trade timestamp +- `symbol` (VARCHAR): Trading pair +- `exchange` (VARCHAR): Exchange name +- `price` (DECIMAL): Trade price +- `size` (DECIMAL): Trade size +- `side` (VARCHAR): Trade side ('buy' or 'sell') +- `trade_id` (VARCHAR): Unique trade identifier + +**Primary Key:** `(timestamp, symbol, exchange, trade_id)` + +### Continuous Aggregates + +Continuous aggregates automatically generate higher timeframe data from lower timeframes: + +1. **ohlcv_1m_continuous**: 1-minute candles from 1-second data +2. **ohlcv_5m_continuous**: 5-minute candles from 1-minute data +3. **ohlcv_15m_continuous**: 15-minute candles from 5-minute data +4. **ohlcv_1h_continuous**: 1-hour candles from 15-minute data +5. **ohlcv_1d_continuous**: 1-day candles from 1-hour data + +### Compression Policies + +Data is automatically compressed to save storage: + +- **ohlcv_data**: Compress after 7 days +- **order_book_snapshots**: Compress after 1 day +- **order_book_1s_agg**: Compress after 2 days +- **order_book_imbalances**: Compress after 2 days +- **trade_events**: Compress after 7 days + +Expected compression ratio: **>80%** + +### Retention Policies + +Old data is automatically deleted: + +- **ohlcv_data**: Retain for 2 years +- **order_book_snapshots**: Retain for 30 days +- **order_book_1s_agg**: Retain for 60 days +- **order_book_imbalances**: Retain for 60 days +- **trade_events**: Retain for 90 days + +## Performance Optimization + +### Indexes + +All tables have optimized indexes for common query patterns: + +- Symbol + timestamp queries +- Timeframe-specific queries +- Exchange-specific queries +- Multi-column composite indexes + +### Query Performance Targets + +- **Cache reads**: <10ms +- **Single timestamp queries**: <100ms +- **Time range queries (1 hour)**: <500ms +- **Ingestion throughput**: >1000 ops/sec + +### Best Practices + +1. **Use time_bucket for aggregations**: + ```sql + SELECT time_bucket('1 minute', timestamp) AS bucket, + symbol, + avg(close_price) AS avg_price + FROM ohlcv_data + WHERE symbol = 'ETH/USDT' + AND timestamp >= NOW() - INTERVAL '1 hour' + GROUP BY bucket, symbol; + ``` + +2. **Query specific timeframes**: + ```sql + SELECT * FROM ohlcv_data + WHERE symbol = 'ETH/USDT' + AND timeframe = '1m' + AND timestamp >= NOW() - INTERVAL '1 day' + ORDER BY timestamp DESC; + ``` + +3. **Use continuous aggregates for historical data**: + ```sql + SELECT * FROM ohlcv_1h_continuous + WHERE symbol = 'ETH/USDT' + AND timestamp >= NOW() - INTERVAL '7 days' + ORDER BY timestamp DESC; + ``` + +## Monitoring + +### Check Database Size + +```sql +SELECT + hypertable_name, + pg_size_pretty(total_bytes) AS total_size, + pg_size_pretty(compressed_total_bytes) AS compressed_size, + ROUND((1 - compressed_total_bytes::numeric / total_bytes::numeric) * 100, 2) AS compression_ratio +FROM timescaledb_information.hypertables +WHERE hypertable_schema = 'public'; +``` + +### Check Chunk Information + +```sql +SELECT + hypertable_name, + num_chunks, + num_compressed_chunks, + compression_enabled +FROM timescaledb_information.hypertables +WHERE hypertable_schema = 'public'; +``` + +### Check Continuous Aggregate Status + +```sql +SELECT + view_name, + materialization_hypertable_name, + pg_size_pretty(total_bytes) AS size +FROM timescaledb_information.continuous_aggregates +WHERE view_schema = 'public'; +``` + +## Troubleshooting + +### TimescaleDB Extension Not Found + +If you see "TimescaleDB extension not found": + +1. Ensure TimescaleDB is installed +2. Connect to database and run: `CREATE EXTENSION timescaledb;` +3. Restart the setup script + +### Connection Refused + +If you see "connection refused": + +1. Check PostgreSQL is running: `pg_isready` +2. Verify connection details in `config.yaml` +3. Check firewall settings + +### Permission Denied + +If you see "permission denied": + +1. Ensure database user has CREATE privileges +2. Grant privileges: `GRANT ALL PRIVILEGES ON DATABASE trading_data TO postgres;` + +### Slow Queries + +If queries are slow: + +1. Check if indexes exist: `\di` in psql +2. Analyze query plan: `EXPLAIN ANALYZE ` +3. Ensure compression is enabled +4. Consider adding more specific indexes + +## Next Steps + +After setup is complete: + +1. **Implement data models** (Task 2) +2. **Implement cache layer** (Task 3) +3. **Implement database connection layer** (Task 4) +4. **Start data migration** from Parquet files (Task 7) + +## Support + +For issues or questions: +- Check TimescaleDB docs: https://docs.timescale.com/ +- Review PostgreSQL logs: `tail -f /var/log/postgresql/postgresql-*.log` +- Enable debug logging in setup script diff --git a/scripts/setup_unified_storage.py b/scripts/setup_unified_storage.py new file mode 100644 index 0000000..152bf30 --- /dev/null +++ b/scripts/setup_unified_storage.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +Setup script for unified data storage system. +Initializes TimescaleDB schema and verifies setup. +""" + +import asyncio +import asyncpg +import logging +import sys +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from core.unified_storage_schema import UnifiedStorageSchemaManager +from core.config import get_config + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +async def create_connection_pool(config): + """Create database connection pool.""" + try: + # Build connection string + db_config = config.get('database', {}) + + # Default values + host = db_config.get('host', 'localhost') + port = db_config.get('port', 5432) + database = db_config.get('name', 'trading_data') + user = db_config.get('user', 'postgres') + password = db_config.get('password', 'postgres') + + logger.info(f"Connecting to database: {host}:{port}/{database}") + + pool = await asyncpg.create_pool( + host=host, + port=port, + database=database, + user=user, + password=password, + min_size=2, + max_size=10, + command_timeout=60 + ) + + logger.info("Database connection pool created") + return pool + + except Exception as e: + logger.error(f"Failed to create connection pool: {e}") + raise + + +async def verify_timescaledb(pool): + """Verify TimescaleDB extension is available.""" + try: + async with pool.acquire() as conn: + # Check if TimescaleDB extension exists + result = await conn.fetchval(""" + SELECT EXISTS ( + SELECT FROM pg_extension WHERE extname = 'timescaledb' + ) + """) + + if result: + # Get TimescaleDB version + version = await conn.fetchval("SELECT extversion FROM pg_extension WHERE extname = 'timescaledb'") + logger.info(f"TimescaleDB extension found (version {version})") + return True + else: + logger.warning("TimescaleDB extension not found, attempting to create...") + + # Try to create extension + await conn.execute("CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE") + logger.info("TimescaleDB extension created successfully") + return True + + except Exception as e: + logger.error(f"Failed to verify TimescaleDB: {e}") + logger.error("Please ensure TimescaleDB is installed: https://docs.timescale.com/install/latest/") + return False + + +async def setup_schema(pool): + """Set up the complete unified storage schema.""" + try: + schema_manager = UnifiedStorageSchemaManager(pool) + + logger.info("Starting schema setup...") + success = await schema_manager.setup_complete_schema() + + if success: + logger.info("Schema setup completed successfully") + + # Verify schema + logger.info("Verifying schema...") + verified = await schema_manager.verify_schema() + + if verified: + logger.info("Schema verification passed") + + # Get schema info + info = await schema_manager.get_schema_info() + + logger.info("\n=== Schema Information ===") + logger.info(f"Migrations applied: {len(info.get('migrations', []))}") + logger.info(f"Tables created: {len(info.get('tables', []))}") + logger.info(f"Hypertables: {len(info.get('hypertables', []))}") + logger.info(f"Continuous aggregates: {len(info.get('continuous_aggregates', []))}") + + # Display table sizes + logger.info("\n=== Table Sizes ===") + for table in info.get('tables', []): + logger.info(f" {table['tablename']}: {table['size']}") + + # Display hypertables + logger.info("\n=== Hypertables ===") + for ht in info.get('hypertables', []): + logger.info(f" {ht['hypertable_name']}: {ht['num_chunks']} chunks, " + f"compression={'enabled' if ht['compression_enabled'] else 'disabled'}") + + # Display continuous aggregates + if info.get('continuous_aggregates'): + logger.info("\n=== Continuous Aggregates ===") + for ca in info.get('continuous_aggregates', []): + logger.info(f" {ca['view_name']}: {ca.get('size', 'N/A')}") + + return True + else: + logger.error("Schema verification failed") + return False + else: + logger.error("Schema setup failed") + return False + + except Exception as e: + logger.error(f"Error during schema setup: {e}") + return False + + +async def test_basic_operations(pool): + """Test basic database operations.""" + try: + logger.info("\n=== Testing Basic Operations ===") + + async with pool.acquire() as conn: + # Test insert into ohlcv_data + logger.info("Testing OHLCV insert...") + await conn.execute(""" + INSERT INTO ohlcv_data + (timestamp, symbol, timeframe, open_price, high_price, low_price, close_price, volume) + VALUES (NOW(), 'ETH/USDT', '1s', 2000.0, 2001.0, 1999.0, 2000.5, 100.0) + ON CONFLICT (timestamp, symbol, timeframe) DO NOTHING + """) + logger.info("✓ OHLCV insert successful") + + # Test query + logger.info("Testing OHLCV query...") + result = await conn.fetchrow(""" + SELECT * FROM ohlcv_data + WHERE symbol = 'ETH/USDT' + ORDER BY timestamp DESC + LIMIT 1 + """) + if result: + logger.info(f"✓ OHLCV query successful: {dict(result)}") + + # Test order book insert + logger.info("Testing order book insert...") + await conn.execute(""" + INSERT INTO order_book_snapshots + (timestamp, symbol, exchange, bids, asks, mid_price, spread) + VALUES (NOW(), 'ETH/USDT', 'binance', '[]'::jsonb, '[]'::jsonb, 2000.0, 0.1) + ON CONFLICT (timestamp, symbol, exchange) DO NOTHING + """) + logger.info("✓ Order book insert successful") + + # Test imbalances insert + logger.info("Testing imbalances insert...") + await conn.execute(""" + INSERT INTO order_book_imbalances + (timestamp, symbol, imbalance_1s, imbalance_5s, imbalance_15s, imbalance_60s) + VALUES (NOW(), 'ETH/USDT', 0.5, 0.4, 0.3, 0.2) + ON CONFLICT (timestamp, symbol) DO NOTHING + """) + logger.info("✓ Imbalances insert successful") + + logger.info("\n✓ All basic operations successful") + return True + + except Exception as e: + logger.error(f"Basic operations test failed: {e}") + return False + + +async def main(): + """Main setup function.""" + logger.info("=== Unified Data Storage Setup ===\n") + + pool = None + try: + # Load configuration + config = get_config() + + # Create connection pool + pool = await create_connection_pool(config) + + # Verify TimescaleDB + if not await verify_timescaledb(pool): + logger.error("TimescaleDB verification failed") + return 1 + + # Setup schema + if not await setup_schema(pool): + logger.error("Schema setup failed") + return 1 + + # Test basic operations + if not await test_basic_operations(pool): + logger.error("Basic operations test failed") + return 1 + + logger.info("\n=== Setup Complete ===") + logger.info("Unified data storage system is ready to use!") + return 0 + + except Exception as e: + logger.error(f"Setup failed: {e}") + return 1 + + finally: + if pool: + await pool.close() + logger.info("Database connection pool closed") + + +if __name__ == "__main__": + exit_code = asyncio.run(main()) + sys.exit(exit_code)