diff --git a/ANNOTATE/core/real_training_adapter.py b/ANNOTATE/core/real_training_adapter.py index 8003a98..7bfa0a7 100644 --- a/ANNOTATE/core/real_training_adapter.py +++ b/ANNOTATE/core/real_training_adapter.py @@ -28,6 +28,73 @@ except ImportError: logger = logging.getLogger(__name__) +def parse_timestamp_to_utc(timestamp_str: str) -> datetime: + """ + Unified timestamp parser that handles all formats and ensures UTC timezone. + + Handles: + - ISO format with timezone: '2025-10-27T14:00:00+00:00' + - ISO format with Z: '2025-10-27T14:00:00Z' + - Space-separated with seconds: '2025-10-27 14:00:00' + - Space-separated without seconds: '2025-10-27 14:00' + + Args: + timestamp_str: Timestamp string in various formats + + Returns: + Timezone-aware datetime object in UTC + + Raises: + ValueError: If timestamp cannot be parsed + """ + if not timestamp_str: + raise ValueError("Empty timestamp string") + + # Try ISO format first (handles T separator and timezone info) + if 'T' in timestamp_str or '+' in timestamp_str: + try: + # Handle 'Z' suffix (Zulu time = UTC) + if timestamp_str.endswith('Z'): + timestamp_str = timestamp_str[:-1] + '+00:00' + return datetime.fromisoformat(timestamp_str) + except ValueError: + pass + + # Try space-separated formats + # Replace space with T for fromisoformat compatibility + if ' ' in timestamp_str: + try: + # Try parsing with fromisoformat after converting space to T + dt = datetime.fromisoformat(timestamp_str.replace(' ', 'T')) + # Make timezone-aware if naive + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + pass + + # Try explicit format parsing as fallback + formats = [ + '%Y-%m-%d %H:%M:%S', # With seconds + '%Y-%m-%d %H:%M', # Without seconds + '%Y-%m-%dT%H:%M:%S', # ISO without timezone + '%Y-%m-%dT%H:%M', # ISO without seconds or timezone + ] + + for fmt in formats: + try: + dt = datetime.strptime(timestamp_str, fmt) + # Make timezone-aware + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + continue + + # If all parsing attempts fail + raise ValueError(f"Could not parse timestamp: '{timestamp_str}'") + + @dataclass class TrainingSession: """Real training session tracking""" @@ -214,7 +281,10 @@ class RealTrainingAdapter: def _fetch_market_state_for_test_case(self, test_case: Dict) -> Dict: """ - Fetch market state dynamically for a test case + Fetch market state dynamically for a test case from DuckDB storage + + This fetches HISTORICAL data at the specific timestamp from the annotation, + not current/latest data. Args: test_case: Test case dictionary with timestamp, symbol, etc. @@ -234,17 +304,32 @@ class RealTrainingAdapter: logger.warning("No timestamp in test case") return {} - # Parse timestamp - from datetime import datetime - timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + # Parse timestamp using unified parser + try: + timestamp = parse_timestamp_to_utc(timestamp_str) + except Exception as e: + logger.warning(f"Could not parse timestamp '{timestamp_str}': {e}") + return {} # Get training config training_config = test_case.get('training_config', {}) timeframes = training_config.get('timeframes', ['1s', '1m', '1h', '1d']) context_window = training_config.get('context_window_minutes', 5) + negative_samples_window = training_config.get('negative_samples_window', 15) # ±15 candles - logger.info(f" Fetching market state for {symbol} at {timestamp}") - logger.info(f" Timeframes: {timeframes}, Context window: {context_window} minutes") + # Calculate extended time range to include negative sampling window + # For 1m timeframe: ±15 candles = ±15 minutes + # Add buffer to ensure we have enough data + extended_window_minutes = max(context_window, negative_samples_window + 10) + + logger.info(f" Fetching HISTORICAL market state for {symbol} at {timestamp}") + logger.info(f" Timeframes: {timeframes}, Extended window: ±{extended_window_minutes} minutes") + logger.info(f" (Includes ±{negative_samples_window} candles for negative sampling)") + + # Calculate time range for extended context window + from datetime import timedelta + start_time = timestamp - timedelta(minutes=extended_window_minutes) + end_time = timestamp + timedelta(minutes=extended_window_minutes) # Fetch data for each timeframe market_state = { @@ -253,14 +338,67 @@ class RealTrainingAdapter: 'timeframes': {} } + # Try to get data from DuckDB storage first (historical data) + duckdb_storage = None + if hasattr(self.data_provider, 'duckdb_storage'): + duckdb_storage = self.data_provider.duckdb_storage + for timeframe in timeframes: - # Get historical data around the timestamp - # For now, just get the latest data (we can improve this later) - df = self.data_provider.get_historical_data( - symbol=symbol, - timeframe=timeframe, - limit=100 # Get 100 candles for context - ) + df = None + + # Calculate appropriate limit based on timeframe and window + # We want enough candles to cover the extended window plus negative samples + if timeframe == '1s': + limit = extended_window_minutes * 60 * 2 + 100 # 2x for safety + buffer + elif timeframe == '1m': + limit = extended_window_minutes * 2 + 50 # 2x for safety + buffer + elif timeframe == '1h': + limit = max(200, extended_window_minutes // 30) # At least 200 candles + elif timeframe == '1d': + limit = 200 # Fixed for daily + else: + limit = 300 + + # Try DuckDB storage first (has historical data) + if duckdb_storage: + try: + df = duckdb_storage.get_ohlcv_data( + symbol=symbol, + timeframe=timeframe, + start_time=start_time, + end_time=end_time, + limit=limit, + direction='latest' + ) + if df is not None and not df.empty: + logger.debug(f" {timeframe}: {len(df)} candles from DuckDB (historical)") + except Exception as e: + logger.debug(f" {timeframe}: DuckDB query failed: {e}") + + # Fallback to data_provider (might have cached data) + if df is None or df.empty: + try: + # Use get_historical_data_replay for time-specific data + replay_data = self.data_provider.get_historical_data_replay( + symbol=symbol, + start_time=start_time, + end_time=end_time, + timeframes=[timeframe] + ) + df = replay_data.get(timeframe) + if df is not None and not df.empty: + logger.debug(f" {timeframe}: {len(df)} candles from replay") + except Exception as e: + logger.debug(f" {timeframe}: Replay failed: {e}") + + # Last resort: get latest data (not ideal but better than nothing) + if df is None or df.empty: + logger.warning(f" {timeframe}: No historical data found, using latest data as fallback") + df = self.data_provider.get_historical_data( + symbol=symbol, + timeframe=timeframe, + limit=limit # Use calculated limit + ) if df is not None and not df.empty: # Convert to dict format @@ -272,15 +410,15 @@ class RealTrainingAdapter: 'close': df['close'].tolist(), 'volume': df['volume'].tolist() } - logger.debug(f" {timeframe}: {len(df)} candles") + logger.debug(f" {timeframe}: {len(df)} candles stored") else: - logger.warning(f" {timeframe}: No data") + logger.warning(f" {timeframe}: No data available") if market_state['timeframes']: logger.info(f" Fetched market state with {len(market_state['timeframes'])} timeframes") return market_state else: - logger.warning(f" No market data fetched") + logger.warning(f" No market data fetched for any timeframe") return {} except Exception as e: @@ -441,23 +579,9 @@ class RealTrainingAdapter: logger.debug(" No holding period, skipping HOLD samples") return hold_samples - # Parse entry timestamp - handle multiple formats + # Parse entry timestamp using unified parser try: - if 'T' in entry_timestamp: - entry_time = datetime.fromisoformat(entry_timestamp.replace('Z', '+00:00')) - else: - # Try with seconds first, then without - try: - entry_time = datetime.strptime(entry_timestamp, '%Y-%m-%d %H:%M:%S') - except ValueError: - # Try without seconds - entry_time = datetime.strptime(entry_timestamp, '%Y-%m-%d %H:%M') - - # Make timezone-aware - if pytz: - entry_time = entry_time.replace(tzinfo=pytz.UTC) - else: - entry_time = entry_time.replace(tzinfo=timezone.utc) + entry_time = parse_timestamp_to_utc(entry_timestamp) except Exception as e: logger.warning(f"Could not parse entry timestamp '{entry_timestamp}': {e}") return hold_samples @@ -473,18 +597,9 @@ class RealTrainingAdapter: # Find all candles between entry and exit for idx, ts_str in enumerate(timestamps): - # Parse timestamp and ensure it's timezone-aware + # Parse timestamp using unified parser try: - if 'T' in ts_str: - ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) - else: - ts = datetime.fromisoformat(ts_str.replace(' ', 'T')) - # Make timezone-aware if it's naive - if ts.tzinfo is None: - if pytz: - ts = ts.replace(tzinfo=pytz.UTC) - else: - ts = ts.replace(tzinfo=timezone.utc) + ts = parse_timestamp_to_utc(ts_str) except Exception as e: logger.debug(f"Could not parse timestamp '{ts_str}': {e}") continue @@ -550,23 +665,9 @@ class RealTrainingAdapter: # Find the index of the signal timestamp from datetime import datetime - # Parse signal timestamp - handle different formats + # Parse signal timestamp using unified parser try: - if 'T' in signal_timestamp: - signal_time = datetime.fromisoformat(signal_timestamp.replace('Z', '+00:00')) - else: - # Try with seconds first, then without - try: - signal_time = datetime.strptime(signal_timestamp, '%Y-%m-%d %H:%M:%S') - except ValueError: - # Try without seconds - signal_time = datetime.strptime(signal_timestamp, '%Y-%m-%d %H:%M') - - # Make timezone-aware - if pytz: - signal_time = signal_time.replace(tzinfo=pytz.UTC) - else: - signal_time = signal_time.replace(tzinfo=timezone.utc) + signal_time = parse_timestamp_to_utc(signal_timestamp) except Exception as e: logger.warning(f"Could not parse signal timestamp '{signal_timestamp}': {e}") return negative_samples @@ -574,22 +675,8 @@ class RealTrainingAdapter: signal_index = None for idx, ts_str in enumerate(timestamps): try: - # Parse timestamp from market data - handle multiple formats - if 'T' in ts_str: - ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) - else: - # Try with seconds first, then without - try: - ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S') - except ValueError: - ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M') - - # Make timezone-aware if naive - if ts.tzinfo is None: - if pytz: - ts = ts.replace(tzinfo=pytz.UTC) - else: - ts = ts.replace(tzinfo=timezone.utc) + # Parse timestamp using unified parser + ts = parse_timestamp_to_utc(ts_str) # Match within 1 minute if abs((ts - signal_time).total_seconds()) < 60: @@ -1147,8 +1234,9 @@ class RealTrainingAdapter: future_prices = torch.tensor([future_price], dtype=torch.float32) # Trade success (1.0 if profitable, 0.0 otherwise) + # Shape must be [batch_size, 1] to match confidence head output profit_loss_pct = training_sample.get('profit_loss_pct', 0.0) - trade_success = torch.tensor([1.0 if profit_loss_pct > 0 else 0.0], dtype=torch.float32) + trade_success = torch.tensor([[1.0 if profit_loss_pct > 0 else 0.0]], dtype=torch.float32) # Return batch dictionary batch = { diff --git a/NN/models/advanced_transformer_trading.py b/NN/models/advanced_transformer_trading.py index 4bdb97d..7832aa2 100644 --- a/NN/models/advanced_transformer_trading.py +++ b/NN/models/advanced_transformer_trading.py @@ -971,20 +971,20 @@ class TradingTransformerTrainer: # Add confidence loss if available if 'confidence' in outputs and 'trade_success' in batch: - # Ensure both tensors have compatible shapes for BCELoss - # BCELoss requires both inputs to have the same shape - confidence_pred = outputs['confidence'] # Keep as [batch_size, 1] + # Both tensors should have shape [batch_size, 1] + # confidence: [batch_size, 1] from confidence_head + # trade_success: [batch_size, 1] from batch preparation + confidence_pred = outputs['confidence'] trade_target = batch['trade_success'].float() - # Reshape target to match prediction shape [batch_size, 1] - if trade_target.dim() == 1: - trade_target = trade_target.unsqueeze(-1) - - # Ensure both have same shape + # Verify shapes match (should both be [batch_size, 1]) if confidence_pred.shape != trade_target.shape: - # If shapes still don't match, squeeze both to 1D - confidence_pred = confidence_pred.view(-1) - trade_target = trade_target.view(-1) + logger.warning(f"Shape mismatch: confidence {confidence_pred.shape} vs target {trade_target.shape}") + # Reshape to match if needed + if trade_target.dim() == 1: + trade_target = trade_target.unsqueeze(-1) + if confidence_pred.dim() == 1: + confidence_pred = confidence_pred.unsqueeze(-1) confidence_loss = self.confidence_criterion(confidence_pred, trade_target) # Use addition instead of += to avoid inplace operation diff --git a/docs/UNIFIED_STORAGE_COMPLETE.md b/docs/UNIFIED_STORAGE_COMPLETE.md deleted file mode 100644 index c0c2709..0000000 --- a/docs/UNIFIED_STORAGE_COMPLETE.md +++ /dev/null @@ -1,355 +0,0 @@ -# Unified Data Storage System - Complete Implementation - -## 🎉 Project Complete! - -The unified data storage system has been successfully implemented and integrated into the existing DataProvider. - -## Completed Tasks (8 out of 10) - -### Task 1: TimescaleDB Schema and Infrastructure -**Files:** -- `core/unified_storage_schema.py` - Schema manager with migrations -- `scripts/setup_unified_storage.py` - Automated setup script -- `docs/UNIFIED_STORAGE_SETUP.md` - Setup documentation - -**Features:** -- 5 hypertables (OHLCV, order book, aggregations, imbalances, trades) -- 5 continuous aggregates for multi-timeframe data -- 15+ optimized indexes -- Compression policies (>80% compression) -- Retention policies (30 days to 2 years) - -### Task 2: Data Models and Validation -**Files:** -- `core/unified_data_models.py` - Data structures -- `core/unified_data_validator.py` - Validation logic - -**Features:** -- `InferenceDataFrame` - Complete inference data -- `OrderBookDataFrame` - Order book with imbalances -- `OHLCVCandle`, `TradeEvent` - Individual data types -- Comprehensive validation and sanitization - -### Task 3: Cache Layer -**Files:** -- `core/unified_cache_manager.py` - In-memory caching - -**Features:** -- <10ms read latency -- 5-minute rolling window -- Thread-safe operations -- Automatic eviction -- Statistics tracking - -### Task 4: Database Connection and Query Layer -**Files:** -- `core/unified_database_manager.py` - Connection pool and queries - -**Features:** -- Async connection pooling -- Health monitoring -- Optimized query methods -- <100ms query latency -- Multi-timeframe support - -### Task 5: Data Ingestion Pipeline -**Files:** -- `core/unified_ingestion_pipeline.py` - Real-time ingestion - -**Features:** -- Batch writes (100 items or 5 seconds) -- Data validation before storage -- Background flush worker -- >1000 ops/sec throughput -- Error handling and retry logic - -### Task 6: Unified Data Provider API -**Files:** -- `core/unified_data_provider_extension.py` - Main API - -**Features:** -- Single `get_inference_data()` endpoint -- Automatic cache/database routing -- Multi-timeframe data retrieval -- Order book data access -- Statistics tracking - -### Task 7: Data Migration System -**Status:** Skipped (decided to drop existing Parquet data) - -### Task 8: Integration with Existing DataProvider -**Files:** -- `core/data_provider.py` - Updated with unified storage methods -- `docs/UNIFIED_STORAGE_INTEGRATION.md` - Integration guide -- `examples/unified_storage_example.py` - Usage examples - -**Features:** -- Seamless integration with existing code -- Backward compatible -- Opt-in unified storage -- Easy to enable/disable - -## 📊 System Architecture - -``` -┌─────────────────────────────────────────────┐ -│ Application Layer │ -│ (Models, Backtesting, Annotation, etc.) │ -└────────────────┬────────────────────────────┘ - │ - ▼ -┌─────────────────────────────────────────────┐ -│ DataProvider (Existing) │ -│ + Unified Storage Extension (New) │ -└────────────────┬────────────────────────────┘ - │ - ┌────────┴────────┐ - ▼ ▼ -┌──────────────┐ ┌──────────────┐ -│ Cache Layer │ │ Database │ -│ (In-Memory) │ │ (TimescaleDB)│ -│ │ │ │ -│ - Last 5 min │ │ - Historical │ -│ - <10ms read │ │ - <100ms read│ -│ - Real-time │ │ - Compressed │ -└──────────────┘ └──────────────┘ -``` - -## Key Features - -### Performance -- Cache reads: <10ms -- Database queries: <100ms -- Ingestion: >1000 ops/sec -- Compression: >80% - -### Reliability -- Data validation -- Error handling -- Health monitoring -- Statistics tracking -- Automatic reconnection - -### Usability -- Single endpoint for all data -- Automatic routing (cache vs database) -- Type-safe interfaces -- Backward compatible -- Easy to integrate - -## 📝 Quick Start - -### 1. Setup Database - -```bash -python scripts/setup_unified_storage.py -``` - -### 2. Enable in Code - -```python -from core.data_provider import DataProvider -import asyncio - -data_provider = DataProvider() - -async def setup(): - await data_provider.enable_unified_storage() - -asyncio.run(setup()) -``` - -### 3. Use Unified API - -```python -# Get real-time data (from cache) -data = await data_provider.get_inference_data_unified('ETH/USDT') - -# Get historical data (from database) -data = await data_provider.get_inference_data_unified( - 'ETH/USDT', - timestamp=datetime(2024, 1, 15, 12, 30) -) -``` - -## 📚 Documentation - -- **Setup Guide**: `docs/UNIFIED_STORAGE_SETUP.md` -- **Integration Guide**: `docs/UNIFIED_STORAGE_INTEGRATION.md` -- **Examples**: `examples/unified_storage_example.py` -- **Design Document**: `.kiro/specs/unified-data-storage/design.md` -- **Requirements**: `.kiro/specs/unified-data-storage/requirements.md` - -## 🎯 Use Cases - -### Real-Time Trading -```python -# Fast access to latest market data -data = await data_provider.get_inference_data_unified('ETH/USDT') -price = data.get_latest_price() -``` - -### Backtesting -```python -# Historical data at any timestamp -data = await data_provider.get_inference_data_unified( - 'ETH/USDT', - timestamp=target_time, - context_window_minutes=60 -) -``` - -### Data Annotation -```python -# Retrieve data at specific timestamps for labeling -for timestamp in annotation_timestamps: - data = await data_provider.get_inference_data_unified( - 'ETH/USDT', - timestamp=timestamp, - context_window_minutes=5 - ) - # Display and annotate -``` - -### Model Training -```python -# Get complete inference data for training -data = await data_provider.get_inference_data_unified( - 'ETH/USDT', - timestamp=training_timestamp -) - -features = { - 'ohlcv': data.ohlcv_1m.to_numpy(), - 'indicators': data.indicators, - 'imbalances': data.imbalances.to_numpy() -} -``` - -## 📈 Performance Metrics - -### Cache Performance -- Hit Rate: >90% (typical) -- Read Latency: <10ms -- Capacity: 5 minutes of data -- Eviction: Automatic - -### Database Performance -- Query Latency: <100ms (typical) -- Write Throughput: >1000 ops/sec -- Compression Ratio: >80% -- Storage: Optimized with TimescaleDB - -### Ingestion Performance -- Validation: All data validated -- Batch Size: 100 items or 5 seconds -- Error Rate: <0.1% (typical) -- Retry: Automatic with backoff - -## 🔧 Configuration - -### Database Config (`config.yaml`) -```yaml -database: - host: localhost - port: 5432 - name: trading_data - user: postgres - password: postgres - pool_size: 20 -``` - -### Cache Config -```python -cache_manager = DataCacheManager( - cache_duration_seconds=300 # 5 minutes -) -``` - -### Ingestion Config -```python -ingestion_pipeline = DataIngestionPipeline( - batch_size=100, - batch_timeout_seconds=5.0 -) -``` - -## 🎓 Examples - -Run the example script: -```bash -python examples/unified_storage_example.py -``` - -This demonstrates: -1. Real-time data access -2. Historical data retrieval -3. Multi-timeframe queries -4. Order book data -5. Statistics tracking - -## 🔍 Monitoring - -### Get Statistics -```python -stats = data_provider.get_unified_storage_stats() - -print(f"Cache hit rate: {stats['cache']['hit_rate_percent']}%") -print(f"DB queries: {stats['database']['total_queries']}") -print(f"Ingestion rate: {stats['ingestion']['total_ingested']}") -``` - -### Check Health -```python -if data_provider.is_unified_storage_enabled(): - print(" Unified storage is running") -else: - print(" Unified storage is not enabled") -``` - -## 🚧 Remaining Tasks (Optional) - -### Task 9: Performance Optimization -- Add detailed monitoring dashboards -- Implement query caching -- Optimize database indexes -- Add performance alerts - -### Task 10: Documentation and Deployment -- Create video tutorials -- Add API reference documentation -- Create deployment guides -- Add monitoring setup - -## 🎉 Success Metrics - - **Completed**: 8 out of 10 major tasks (80%) - **Core Functionality**: 100% complete - **Integration**: Seamless with existing code - **Performance**: Meets all targets - **Documentation**: Comprehensive guides - **Examples**: Working code samples - -## 🙏 Next Steps - -The unified storage system is **production-ready** and can be used immediately: - -1. **Setup Database**: Run `python scripts/setup_unified_storage.py` -2. **Enable in Code**: Call `await data_provider.enable_unified_storage()` -3. **Start Using**: Use `get_inference_data_unified()` for all data access -4. **Monitor**: Check statistics with `get_unified_storage_stats()` - -## 📞 Support - -For issues or questions: -1. Check documentation in `docs/` -2. Review examples in `examples/` -3. Check database setup: `python scripts/setup_unified_storage.py` -4. Review logs for errors - ---- - -**Status**: Production Ready -**Version**: 1.0.0 -**Last Updated**: 2024 -**Completion**: 80% (8/10 tasks) diff --git a/docs/UNIFIED_STORAGE_INTEGRATION.md b/docs/UNIFIED_STORAGE_INTEGRATION.md deleted file mode 100644 index 8da9df4..0000000 --- a/docs/UNIFIED_STORAGE_INTEGRATION.md +++ /dev/null @@ -1,398 +0,0 @@ -# Unified Storage System Integration Guide - -## Overview - -The unified storage system has been integrated into the existing `DataProvider` class, providing a single endpoint for both real-time and historical data access. - -## Key Features - - **Single Endpoint**: One method for all data access - **Automatic Routing**: Cache for real-time, database for historical - **Backward Compatible**: All existing methods still work - **Opt-In**: Only enabled when explicitly initialized - **Fast**: <10ms cache reads, <100ms database queries - -## Quick Start - -### 1. Enable Unified Storage - -```python -from core.data_provider import DataProvider -import asyncio - -# Create DataProvider (existing code works as before) -data_provider = DataProvider() - -# Enable unified storage system -async def setup(): - success = await data_provider.enable_unified_storage() - if success: - print(" Unified storage enabled!") - else: - print(" Failed to enable unified storage") - -asyncio.run(setup()) -``` - -### 2. Get Real-Time Data (from cache) - -```python -async def get_realtime_data(): - # Get latest real-time data (timestamp=None) - inference_data = await data_provider.get_inference_data_unified('ETH/USDT') - - print(f"Symbol: {inference_data.symbol}") - print(f"Timestamp: {inference_data.timestamp}") - print(f"Latest price: {inference_data.get_latest_price()}") - print(f"Data source: {inference_data.data_source}") # 'cache' - print(f"Query latency: {inference_data.query_latency_ms}ms") # <10ms - - # Check data completeness - if inference_data.has_complete_data(): - print("✓ All required data present") - - # Get data summary - summary = inference_data.get_data_summary() - print(f"OHLCV 1m rows: {summary['ohlcv_1m_rows']}") - print(f"Has orderbook: {summary['has_orderbook']}") - print(f"Imbalances rows: {summary['imbalances_rows']}") - -asyncio.run(get_realtime_data()) -``` - -### 3. Get Historical Data (from database) - -```python -from datetime import datetime, timedelta - -async def get_historical_data(): - # Get historical data at specific timestamp - target_time = datetime.now() - timedelta(hours=1) - - inference_data = await data_provider.get_inference_data_unified( - symbol='ETH/USDT', - timestamp=target_time, - context_window_minutes=5 # ±5 minutes of context - ) - - print(f"Data source: {inference_data.data_source}") # 'database' - print(f"Query latency: {inference_data.query_latency_ms}ms") # <100ms - - # Access multi-timeframe data - print(f"1s candles: {len(inference_data.ohlcv_1s)}") - print(f"1m candles: {len(inference_data.ohlcv_1m)}") - print(f"1h candles: {len(inference_data.ohlcv_1h)}") - - # Access technical indicators - print(f"RSI: {inference_data.indicators.get('rsi_14')}") - print(f"MACD: {inference_data.indicators.get('macd')}") - - # Access context data - if inference_data.context_data is not None: - print(f"Context data: {len(inference_data.context_data)} rows") - -asyncio.run(get_historical_data()) -``` - -### 4. Get Multi-Timeframe Data - -```python -async def get_multi_timeframe(): - # Get multiple timeframes at once - multi_tf = await data_provider.get_multi_timeframe_data_unified( - symbol='ETH/USDT', - timeframes=['1m', '5m', '1h'], - limit=100 - ) - - for timeframe, df in multi_tf.items(): - print(f"{timeframe}: {len(df)} candles") - if not df.empty: - print(f" Latest close: {df.iloc[-1]['close_price']}") - -asyncio.run(get_multi_timeframe()) -``` - -### 5. Get Order Book Data - -```python -async def get_orderbook(): - # Get order book with imbalances - orderbook = await data_provider.get_order_book_data_unified('ETH/USDT') - - print(f"Mid price: {orderbook.mid_price}") - print(f"Spread: {orderbook.spread}") - print(f"Spread (bps): {orderbook.get_spread_bps()}") - - # Get best bid/ask - best_bid = orderbook.get_best_bid() - best_ask = orderbook.get_best_ask() - print(f"Best bid: {best_bid}") - print(f"Best ask: {best_ask}") - - # Get imbalance summary - imbalances = orderbook.get_imbalance_summary() - print(f"Imbalances: {imbalances}") - -asyncio.run(get_orderbook()) -``` - -### 6. Get Statistics - -```python -# Get unified storage statistics -stats = data_provider.get_unified_storage_stats() - -print("=== Cache Statistics ===") -print(f"Hit rate: {stats['cache']['hit_rate_percent']}%") -print(f"Total entries: {stats['cache']['total_entries']}") - -print("\n=== Database Statistics ===") -print(f"Total queries: {stats['database']['total_queries']}") -print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms") - -print("\n=== Ingestion Statistics ===") -print(f"Total ingested: {stats['ingestion']['total_ingested']}") -print(f"Validation failures: {stats['ingestion']['validation_failures']}") -``` - -## Integration with Existing Code - -### Backward Compatibility - -All existing DataProvider methods continue to work: - -```python -# Existing methods still work -df = data_provider.get_historical_data('ETH/USDT', '1m', limit=100) -price = data_provider.get_current_price('ETH/USDT') -features = data_provider.get_feature_matrix('ETH/USDT') - -# New unified methods available alongside -inference_data = await data_provider.get_inference_data_unified('ETH/USDT') -``` - -### Gradual Migration - -You can migrate to unified storage gradually: - -```python -# Option 1: Use existing methods (no changes needed) -df = data_provider.get_historical_data('ETH/USDT', '1m') - -# Option 2: Use unified storage for new features -inference_data = await data_provider.get_inference_data_unified('ETH/USDT') -``` - -## Use Cases - -### 1. Real-Time Trading - -```python -async def realtime_trading_loop(): - while True: - # Get latest market data (fast!) - data = await data_provider.get_inference_data_unified('ETH/USDT') - - # Make trading decision - if data.has_complete_data(): - price = data.get_latest_price() - rsi = data.indicators.get('rsi_14', 50) - - if rsi < 30: - print(f"Buy signal at {price}") - elif rsi > 70: - print(f"Sell signal at {price}") - - await asyncio.sleep(1) -``` - -### 2. Backtesting - -```python -async def backtest_strategy(start_time, end_time): - current_time = start_time - - while current_time < end_time: - # Get historical data at specific time - data = await data_provider.get_inference_data_unified( - 'ETH/USDT', - timestamp=current_time, - context_window_minutes=60 - ) - - # Run strategy - if data.has_complete_data(): - # Your strategy logic here - pass - - # Move to next timestamp - current_time += timedelta(minutes=1) -``` - -### 3. Data Annotation - -```python -async def annotate_data(timestamps): - annotations = [] - - for timestamp in timestamps: - # Get data at specific timestamp - data = await data_provider.get_inference_data_unified( - 'ETH/USDT', - timestamp=timestamp, - context_window_minutes=5 - ) - - # Display to user for annotation - # User marks buy/sell signals - annotation = { - 'timestamp': timestamp, - 'price': data.get_latest_price(), - 'signal': 'buy', # User input - 'data': data.to_dict() - } - annotations.append(annotation) - - return annotations -``` - -### 4. Model Training - -```python -async def prepare_training_data(symbol, start_time, end_time): - training_samples = [] - - current_time = start_time - while current_time < end_time: - # Get complete inference data - data = await data_provider.get_inference_data_unified( - symbol, - timestamp=current_time, - context_window_minutes=10 - ) - - if data.has_complete_data(): - # Extract features - features = { - 'ohlcv_1m': data.ohlcv_1m.to_numpy(), - 'indicators': data.indicators, - 'imbalances': data.imbalances.to_numpy(), - 'orderbook': data.orderbook_snapshot - } - - training_samples.append(features) - - current_time += timedelta(minutes=1) - - return training_samples -``` - -## Configuration - -### Database Configuration - -Update `config.yaml`: - -```yaml -database: - host: localhost - port: 5432 - name: trading_data - user: postgres - password: postgres - pool_size: 20 -``` - -### Setup Database - -```bash -# Run setup script -python scripts/setup_unified_storage.py -``` - -## Performance Tips - -1. **Use Real-Time Endpoint for Latest Data** - ```python - # Fast (cache) - data = await data_provider.get_inference_data_unified('ETH/USDT') - - # Slower (database) - data = await data_provider.get_inference_data_unified('ETH/USDT', datetime.now()) - ``` - -2. **Batch Historical Queries** - ```python - # Get multiple timeframes at once - multi_tf = await data_provider.get_multi_timeframe_data_unified( - 'ETH/USDT', - ['1m', '5m', '1h'], - limit=100 - ) - ``` - -3. **Monitor Performance** - ```python - stats = data_provider.get_unified_storage_stats() - print(f"Cache hit rate: {stats['cache']['hit_rate_percent']}%") - print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms") - ``` - -## Troubleshooting - -### Unified Storage Not Available - -```python -if not data_provider.is_unified_storage_enabled(): - success = await data_provider.enable_unified_storage() - if not success: - print("Check database connection and configuration") -``` - -### Slow Queries - -```python -# Check query latency -data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp) -if data.query_latency_ms > 100: - print(f"Slow query: {data.query_latency_ms}ms") - # Check database stats - stats = data_provider.get_unified_storage_stats() - print(stats['database']) -``` - -### Missing Data - -```python -data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp) -if not data.has_complete_data(): - summary = data.get_data_summary() - print(f"Missing data: {summary}") -``` - -## API Reference - -### Main Methods - -- `enable_unified_storage()` - Enable unified storage system -- `disable_unified_storage()` - Disable unified storage system -- `get_inference_data_unified()` - Get complete inference data -- `get_multi_timeframe_data_unified()` - Get multi-timeframe data -- `get_order_book_data_unified()` - Get order book with imbalances -- `get_unified_storage_stats()` - Get statistics -- `is_unified_storage_enabled()` - Check if enabled - -### Data Models - -- `InferenceDataFrame` - Complete inference data structure -- `OrderBookDataFrame` - Order book with imbalances -- `OHLCVCandle` - Single candlestick -- `TradeEvent` - Individual trade - -## Support - -For issues or questions: -1. Check database connection: `python scripts/setup_unified_storage.py` -2. Review logs for errors -3. Check statistics: `data_provider.get_unified_storage_stats()` diff --git a/docs/UNIFIED_STORAGE_SETUP.md b/docs/UNIFIED_STORAGE_SETUP.md deleted file mode 100644 index c85293f..0000000 --- a/docs/UNIFIED_STORAGE_SETUP.md +++ /dev/null @@ -1,337 +0,0 @@ -# Unified Data Storage Setup Guide - -## Overview - -The unified data storage system consolidates all market data storage into a single TimescaleDB backend, replacing fragmented Parquet files, pickle files, and in-memory caches. - -## Prerequisites - -### 1. PostgreSQL with TimescaleDB - -You need PostgreSQL 12+ with TimescaleDB extension installed. - -#### Installation Options - -**Option A: Docker (Recommended)** -```bash -docker run -d --name timescaledb \ - -p 5432:5432 \ - -e POSTGRES_PASSWORD=postgres \ - -e POSTGRES_DB=trading_data \ - timescale/timescaledb:latest-pg14 -``` - -**Option B: Local Installation** -- Follow TimescaleDB installation guide: https://docs.timescale.com/install/latest/ -- Create database: `createdb trading_data` - -### 2. Python Dependencies - -Ensure you have the required Python packages: -```bash -pip install asyncpg -``` - -## Database Configuration - -Update your `config.yaml` with database connection details: - -```yaml -database: - host: localhost - port: 5432 - name: trading_data - user: postgres - password: postgres - pool_size: 20 -``` - -## Setup Process - -### Step 1: Run Setup Script - -```bash -python scripts/setup_unified_storage.py -``` - -This script will: -1. Connect to the database -2. Verify TimescaleDB extension -3. Create all required tables -4. Convert tables to hypertables -5. Create indexes for performance -6. Set up continuous aggregates -7. Configure compression policies -8. Configure retention policies -9. Verify the setup -10. Run basic operation tests - -### Step 2: Verify Setup - -The setup script will display schema information: - -``` -=== Schema Information === -Migrations applied: 8 -Tables created: 5 -Hypertables: 5 -Continuous aggregates: 5 - -=== Table Sizes === - ohlcv_data: 8192 bytes - order_book_snapshots: 8192 bytes - order_book_1s_agg: 8192 bytes - order_book_imbalances: 8192 bytes - trade_events: 8192 bytes - -=== Hypertables === - ohlcv_data: 0 chunks, compression=enabled - order_book_snapshots: 0 chunks, compression=enabled - order_book_1s_agg: 0 chunks, compression=enabled - order_book_imbalances: 0 chunks, compression=enabled - trade_events: 0 chunks, compression=enabled - -=== Continuous Aggregates === - ohlcv_1m_continuous: 8192 bytes - ohlcv_5m_continuous: 8192 bytes - ohlcv_15m_continuous: 8192 bytes - ohlcv_1h_continuous: 8192 bytes - ohlcv_1d_continuous: 8192 bytes -``` - -## Database Schema - -### Tables - -#### 1. ohlcv_data -Stores candlestick data for all timeframes with pre-calculated technical indicators. - -**Columns:** -- `timestamp` (TIMESTAMPTZ): Candle timestamp -- `symbol` (VARCHAR): Trading pair (e.g., 'ETH/USDT') -- `timeframe` (VARCHAR): Timeframe (1s, 1m, 5m, 15m, 1h, 1d) -- `open_price`, `high_price`, `low_price`, `close_price` (DECIMAL): OHLC prices -- `volume` (DECIMAL): Trading volume -- `trade_count` (INTEGER): Number of trades -- Technical indicators: `rsi_14`, `macd`, `macd_signal`, `bb_upper`, `bb_middle`, `bb_lower`, etc. - -**Primary Key:** `(timestamp, symbol, timeframe)` - -#### 2. order_book_snapshots -Stores raw order book snapshots. - -**Columns:** -- `timestamp` (TIMESTAMPTZ): Snapshot timestamp -- `symbol` (VARCHAR): Trading pair -- `exchange` (VARCHAR): Exchange name -- `bids` (JSONB): Bid levels (top 50) -- `asks` (JSONB): Ask levels (top 50) -- `mid_price`, `spread`, `bid_volume`, `ask_volume` (DECIMAL): Calculated metrics - -**Primary Key:** `(timestamp, symbol, exchange)` - -#### 3. order_book_1s_agg -Stores 1-second aggregated order book data with $1 price buckets. - -**Columns:** -- `timestamp` (TIMESTAMPTZ): Aggregation timestamp -- `symbol` (VARCHAR): Trading pair -- `price_bucket` (DECIMAL): Price bucket ($1 increments) -- `bid_volume`, `ask_volume` (DECIMAL): Aggregated volumes -- `bid_count`, `ask_count` (INTEGER): Number of orders -- `imbalance` (DECIMAL): Order book imbalance - -**Primary Key:** `(timestamp, symbol, price_bucket)` - -#### 4. order_book_imbalances -Stores multi-timeframe order book imbalance metrics. - -**Columns:** -- `timestamp` (TIMESTAMPTZ): Calculation timestamp -- `symbol` (VARCHAR): Trading pair -- `imbalance_1s`, `imbalance_5s`, `imbalance_15s`, `imbalance_60s` (DECIMAL): Imbalances -- `volume_imbalance_1s`, `volume_imbalance_5s`, etc. (DECIMAL): Volume-weighted imbalances -- `price_range` (DECIMAL): Price range used for calculation - -**Primary Key:** `(timestamp, symbol)` - -#### 5. trade_events -Stores individual trade events. - -**Columns:** -- `timestamp` (TIMESTAMPTZ): Trade timestamp -- `symbol` (VARCHAR): Trading pair -- `exchange` (VARCHAR): Exchange name -- `price` (DECIMAL): Trade price -- `size` (DECIMAL): Trade size -- `side` (VARCHAR): Trade side ('buy' or 'sell') -- `trade_id` (VARCHAR): Unique trade identifier - -**Primary Key:** `(timestamp, symbol, exchange, trade_id)` - -### Continuous Aggregates - -Continuous aggregates automatically generate higher timeframe data from lower timeframes: - -1. **ohlcv_1m_continuous**: 1-minute candles from 1-second data -2. **ohlcv_5m_continuous**: 5-minute candles from 1-minute data -3. **ohlcv_15m_continuous**: 15-minute candles from 5-minute data -4. **ohlcv_1h_continuous**: 1-hour candles from 15-minute data -5. **ohlcv_1d_continuous**: 1-day candles from 1-hour data - -### Compression Policies - -Data is automatically compressed to save storage: - -- **ohlcv_data**: Compress after 7 days -- **order_book_snapshots**: Compress after 1 day -- **order_book_1s_agg**: Compress after 2 days -- **order_book_imbalances**: Compress after 2 days -- **trade_events**: Compress after 7 days - -Expected compression ratio: **>80%** - -### Retention Policies - -Old data is automatically deleted: - -- **ohlcv_data**: Retain for 2 years -- **order_book_snapshots**: Retain for 30 days -- **order_book_1s_agg**: Retain for 60 days -- **order_book_imbalances**: Retain for 60 days -- **trade_events**: Retain for 90 days - -## Performance Optimization - -### Indexes - -All tables have optimized indexes for common query patterns: - -- Symbol + timestamp queries -- Timeframe-specific queries -- Exchange-specific queries -- Multi-column composite indexes - -### Query Performance Targets - -- **Cache reads**: <10ms -- **Single timestamp queries**: <100ms -- **Time range queries (1 hour)**: <500ms -- **Ingestion throughput**: >1000 ops/sec - -### Best Practices - -1. **Use time_bucket for aggregations**: - ```sql - SELECT time_bucket('1 minute', timestamp) AS bucket, - symbol, - avg(close_price) AS avg_price - FROM ohlcv_data - WHERE symbol = 'ETH/USDT' - AND timestamp >= NOW() - INTERVAL '1 hour' - GROUP BY bucket, symbol; - ``` - -2. **Query specific timeframes**: - ```sql - SELECT * FROM ohlcv_data - WHERE symbol = 'ETH/USDT' - AND timeframe = '1m' - AND timestamp >= NOW() - INTERVAL '1 day' - ORDER BY timestamp DESC; - ``` - -3. **Use continuous aggregates for historical data**: - ```sql - SELECT * FROM ohlcv_1h_continuous - WHERE symbol = 'ETH/USDT' - AND timestamp >= NOW() - INTERVAL '7 days' - ORDER BY timestamp DESC; - ``` - -## Monitoring - -### Check Database Size - -```sql -SELECT - hypertable_name, - pg_size_pretty(total_bytes) AS total_size, - pg_size_pretty(compressed_total_bytes) AS compressed_size, - ROUND((1 - compressed_total_bytes::numeric / total_bytes::numeric) * 100, 2) AS compression_ratio -FROM timescaledb_information.hypertables -WHERE hypertable_schema = 'public'; -``` - -### Check Chunk Information - -```sql -SELECT - hypertable_name, - num_chunks, - num_compressed_chunks, - compression_enabled -FROM timescaledb_information.hypertables -WHERE hypertable_schema = 'public'; -``` - -### Check Continuous Aggregate Status - -```sql -SELECT - view_name, - materialization_hypertable_name, - pg_size_pretty(total_bytes) AS size -FROM timescaledb_information.continuous_aggregates -WHERE view_schema = 'public'; -``` - -## Troubleshooting - -### TimescaleDB Extension Not Found - -If you see "TimescaleDB extension not found": - -1. Ensure TimescaleDB is installed -2. Connect to database and run: `CREATE EXTENSION timescaledb;` -3. Restart the setup script - -### Connection Refused - -If you see "connection refused": - -1. Check PostgreSQL is running: `pg_isready` -2. Verify connection details in `config.yaml` -3. Check firewall settings - -### Permission Denied - -If you see "permission denied": - -1. Ensure database user has CREATE privileges -2. Grant privileges: `GRANT ALL PRIVILEGES ON DATABASE trading_data TO postgres;` - -### Slow Queries - -If queries are slow: - -1. Check if indexes exist: `\di` in psql -2. Analyze query plan: `EXPLAIN ANALYZE ` -3. Ensure compression is enabled -4. Consider adding more specific indexes - -## Next Steps - -After setup is complete: - -1. **Implement data models** (Task 2) -2. **Implement cache layer** (Task 3) -3. **Implement database connection layer** (Task 4) -4. **Start data migration** from Parquet files (Task 7) - -## Support - -For issues or questions: -- Check TimescaleDB docs: https://docs.timescale.com/ -- Review PostgreSQL logs: `tail -f /var/log/postgresql/postgresql-*.log` -- Enable debug logging in setup script diff --git a/docs/main/MODEL_INPUTS_OUTPUTS_REFERENCE.md b/docs/main/MODEL_INPUTS_OUTPUTS_REFERENCE.md new file mode 100644 index 0000000..44e5cf3 --- /dev/null +++ b/docs/main/MODEL_INPUTS_OUTPUTS_REFERENCE.md @@ -0,0 +1,308 @@ +# Model Inputs & Outputs Reference + +Quick reference for all trading models in the system. + +--- + +## 1. Transformer (AdvancedTradingTransformer) + +**Type**: Sequence-to-sequence transformer for multi-timeframe analysis +**Size**: 46M parameters +**Architecture**: 12 layers, 16 attention heads, 1024 model dimension + +### Inputs +```python +price_data: [batch, 150, 5] # OHLCV sequences (150 candles) +cob_data: [batch, 150, 100] # Change of Bid features +tech_data: [batch, 40] # Technical indicators (SMA, returns, volatility) +market_data: [batch, 30] # Market context (volume, pivots, support/resistance) +``` + +### Outputs +```python +action_logits: [batch, 3] # Raw logits for BUY(1), SELL(2), HOLD(0) +action_probs: [batch, 3] # Softmax probabilities +confidence: [batch, 1] # Trade confidence (0-1) +price_prediction: [batch, 1] # Future price target +volatility_prediction:[batch, 1] # Expected volatility +trend_strength: [batch, 1] # Trend strength (-1 to 1) + +# Next candle predictions for each timeframe +next_candles: { + '1s': [batch, 5], # [open, high, low, close, volume] + '1m': [batch, 5], + '1h': [batch, 5], + '1d': [batch, 5] +} + +# Pivot point predictions (L1-L5) +next_pivots: { + 'L1': { + 'price': [batch, 1], + 'type_prob_high': [batch, 1], # Probability of high pivot + 'type_prob_low': [batch, 1], # Probability of low pivot + 'confidence': [batch, 1] + }, + # ... L2, L3, L4, L5 (same structure) +} + +# Trend vector analysis +trend_analysis: { + 'angle_radians': [batch, 1], # Trend angle + 'steepness': [batch, 1], # Trend steepness + 'direction': [batch, 1] # Direction (-1 to 1) +} +``` + +### Training Targets +```python +actions: [batch] # Action labels (0=HOLD, 1=BUY, 2=SELL) +future_prices: [batch] # Price targets +trade_success: [batch, 1] # Success labels (0.0 or 1.0) +``` + +--- + +## 2. CNN (StandardizedCNN / EnhancedCNN) + +**Type**: Convolutional neural network for pattern recognition +**Size**: ~5-10M parameters +**Architecture**: Multi-scale convolutions with attention + +### Inputs +```python +# Via BaseDataInput.get_feature_vector() +feature_vector: [batch, 7834] # Flattened features containing: + - OHLCV ETH: 300 frames × 4 timeframes × 5 = 6000 + - OHLCV BTC: 300 frames × 5 = 1500 + - COB features: 184 (±20 buckets + MA imbalance) + - Technical indicators: 100 (padded) + - Last predictions: 50 (padded) +``` + +### Outputs +```python +action_logits: [batch, 3] # BUY, SELL, HOLD logits +action_probs: [batch, 3] # Softmax probabilities +confidence: [batch, 1] # Prediction confidence +hidden_states: [batch, 1024] # Feature embeddings (for cross-model feeding) +predicted_returns: [batch, 4] # [return_1s, return_1m, return_1h, return_1d] +``` + +### Training Targets +```python +actions: [batch] # Action labels (0=HOLD, 1=BUY, 2=SELL) +returns: [batch, 4] # Actual returns per timeframe +``` + +--- + +## 3. DQN (Deep Q-Network Agent) + +**Type**: Reinforcement learning agent for sequential decision making +**Size**: ~15M parameters +**Architecture**: Deep Q-Network with dueling architecture + +### Inputs +```python +# Via BaseDataInput.get_feature_vector() +state: [batch, 7850] # Full feature vector including: + - Multi-timeframe OHLCV data + - COB features + - Technical indicators + - Market regime indicators + - Previous predictions +``` + +### Outputs +```python +q_values: [batch, 3] # Q-values for BUY, SELL, HOLD +action: int # Selected action (0, 1, 2) +confidence: float # Action confidence (0-1) + +# Auxiliary outputs +regime_probs: [batch, 4] # [trending, ranging, volatile, mixed] +price_direction:[batch, 3] # [down, neutral, up] +volatility: [batch, 1] # Predicted volatility +value: [batch, 1] # State value (V) +advantage: [batch, 3] # Action advantages (A) +``` + +### Training Targets +```python +# RL uses experience replay +experience: { + 'state': [7850], + 'action': int, + 'reward': float, + 'next_state': [7850], + 'done': bool +} +``` + +--- + +## 4. COB RL Model (MassiveRLNetwork) + +**Type**: Specialized RL for Change of Bid (COB) data +**Size**: ~3M parameters +**Architecture**: Deep network focused on order book dynamics + +### Inputs +```python +cob_features: [batch, input_size] # COB-specific features: + - Bid/ask imbalance + - Order book depth + - Price level changes + - Volume at price levels + - Moving averages of imbalance +``` + +### Outputs +```python +price_logits: [batch, 3] # Direction logits [DOWN, SIDEWAYS, UP] +price_probs: [batch, 3] # Direction probabilities +confidence: [batch, 1] # Prediction confidence +value: [batch, 1] # State value estimate +predicted_direction: int # 0=DOWN, 1=SIDEWAYS, 2=UP +``` + +### Training Targets +```python +targets: { + 'direction': [batch], # Direction labels (0, 1, 2) + 'value': [batch], # Value targets + 'confidence': [batch] # Confidence targets +} +``` + +--- + +## 5. Extrema Trainer + +**Type**: Pivot point detection and prediction +**Size**: ~1M parameters (lightweight) +**Architecture**: Statistical + ML hybrid + +### Inputs +```python +# Context data (200 candles) +context: { + 'symbol': str, + 'candles': deque[200], # Recent OHLCV candles + 'features': array, # Extracted features + 'last_update': datetime +} + +# For prediction +current_price: float +now: datetime +``` + +### Outputs +```python +# Detected extrema +extrema: { + 'type': str, # 'high' or 'low' + 'price': float, + 'timestamp': datetime, + 'confidence': float, # 0-1 + 'window_size': int +} + +# Predicted pivot +predicted_pivot: { + 'type': str, # 'high' or 'low' + 'price': float, # Predicted price level + 'timestamp': datetime, # Predicted time + 'confidence': float, # 0-1 + 'horizon_seconds': int # Time until pivot (30-300s) +} +``` + +### Training Data +```python +# Historical extrema for validation +historical_extrema: List[{ + 'price': float, + 'timestamp': datetime, + 'type': str, + 'detected': bool +}] +``` + +--- + +## Common Patterns + +### Action Encoding (All Models) +```python +0 = HOLD # No action / maintain position +1 = BUY # Enter long / close short +2 = SELL # Enter short / close long +``` + +### Confidence Scores +- Range: `0.0` to `1.0` +- Typical threshold: `0.6` (60%) +- High confidence: `> 0.8` +- Low confidence: `< 0.4` + +### Batch Sizes +- **Training**: Usually `1` (annotation-based) or `32-128` (batch training) +- **Inference**: Usually `1` (real-time prediction) + +### Device Management +All models support: +- CPU: `torch.device('cpu')` +- CUDA: `torch.device('cuda')` +- Automatic device selection based on availability + +--- + +## Model Selection Guide + +| Use Case | Recommended Model | Why | +|----------|------------------|-----| +| Multi-timeframe analysis | **Transformer** | Handles 150-candle sequences across timeframes | +| Pattern recognition | **CNN** | Excellent at visual pattern detection | +| Sequential decisions | **DQN** | Learns optimal action sequences via RL | +| Order book dynamics | **COB RL** | Specialized for bid/ask imbalance | +| Pivot detection | **Extrema** | Lightweight, fast pivot predictions | + +--- + +## Integration Example + +```python +# Get base data input +base_input = data_provider.get_base_data_input(symbol, timestamp) + +# CNN prediction +cnn_features = base_input.get_feature_vector() +cnn_output = cnn_model(cnn_features) +cnn_action = torch.argmax(cnn_output['action_probs']) + +# Transformer prediction +transformer_batch = prepare_transformer_batch(base_input) +transformer_output = transformer_model(**transformer_batch) +transformer_action = torch.argmax(transformer_output['action_probs']) + +# DQN prediction +dqn_state = base_input.get_feature_vector() +dqn_output = dqn_agent.select_action(dqn_state) +dqn_action = dqn_output['action'] + +# Ensemble decision +final_action = majority_vote([cnn_action, transformer_action, dqn_action]) +``` + +--- + +## Notes + +1. **Shape Conventions**: `[batch, ...]` indicates batch dimension first +2. **Dtype**: All tensors use `torch.float32` unless specified +3. **Gradients**: Only training targets require gradients +4. **Normalization**: Features are typically normalized to `[-1, 1]` or `[0, 1]` +5. **Missing Data**: Padded with zeros or last known values