load market data for training/inference

This commit is contained in:
Dobromir Popov
2025-10-31 01:58:07 +02:00
parent cefd30d2bd
commit 07150fd019
6 changed files with 483 additions and 1177 deletions

View File

@@ -28,6 +28,73 @@ except ImportError:
logger = logging.getLogger(__name__)
def parse_timestamp_to_utc(timestamp_str: str) -> datetime:
"""
Unified timestamp parser that handles all formats and ensures UTC timezone.
Handles:
- ISO format with timezone: '2025-10-27T14:00:00+00:00'
- ISO format with Z: '2025-10-27T14:00:00Z'
- Space-separated with seconds: '2025-10-27 14:00:00'
- Space-separated without seconds: '2025-10-27 14:00'
Args:
timestamp_str: Timestamp string in various formats
Returns:
Timezone-aware datetime object in UTC
Raises:
ValueError: If timestamp cannot be parsed
"""
if not timestamp_str:
raise ValueError("Empty timestamp string")
# Try ISO format first (handles T separator and timezone info)
if 'T' in timestamp_str or '+' in timestamp_str:
try:
# Handle 'Z' suffix (Zulu time = UTC)
if timestamp_str.endswith('Z'):
timestamp_str = timestamp_str[:-1] + '+00:00'
return datetime.fromisoformat(timestamp_str)
except ValueError:
pass
# Try space-separated formats
# Replace space with T for fromisoformat compatibility
if ' ' in timestamp_str:
try:
# Try parsing with fromisoformat after converting space to T
dt = datetime.fromisoformat(timestamp_str.replace(' ', 'T'))
# Make timezone-aware if naive
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
pass
# Try explicit format parsing as fallback
formats = [
'%Y-%m-%d %H:%M:%S', # With seconds
'%Y-%m-%d %H:%M', # Without seconds
'%Y-%m-%dT%H:%M:%S', # ISO without timezone
'%Y-%m-%dT%H:%M', # ISO without seconds or timezone
]
for fmt in formats:
try:
dt = datetime.strptime(timestamp_str, fmt)
# Make timezone-aware
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
except ValueError:
continue
# If all parsing attempts fail
raise ValueError(f"Could not parse timestamp: '{timestamp_str}'")
@dataclass
class TrainingSession:
"""Real training session tracking"""
@@ -214,7 +281,10 @@ class RealTrainingAdapter:
def _fetch_market_state_for_test_case(self, test_case: Dict) -> Dict:
"""
Fetch market state dynamically for a test case
Fetch market state dynamically for a test case from DuckDB storage
This fetches HISTORICAL data at the specific timestamp from the annotation,
not current/latest data.
Args:
test_case: Test case dictionary with timestamp, symbol, etc.
@@ -234,17 +304,32 @@ class RealTrainingAdapter:
logger.warning("No timestamp in test case")
return {}
# Parse timestamp
from datetime import datetime
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
# Parse timestamp using unified parser
try:
timestamp = parse_timestamp_to_utc(timestamp_str)
except Exception as e:
logger.warning(f"Could not parse timestamp '{timestamp_str}': {e}")
return {}
# Get training config
training_config = test_case.get('training_config', {})
timeframes = training_config.get('timeframes', ['1s', '1m', '1h', '1d'])
context_window = training_config.get('context_window_minutes', 5)
negative_samples_window = training_config.get('negative_samples_window', 15) # ±15 candles
logger.info(f" Fetching market state for {symbol} at {timestamp}")
logger.info(f" Timeframes: {timeframes}, Context window: {context_window} minutes")
# Calculate extended time range to include negative sampling window
# For 1m timeframe: ±15 candles = ±15 minutes
# Add buffer to ensure we have enough data
extended_window_minutes = max(context_window, negative_samples_window + 10)
logger.info(f" Fetching HISTORICAL market state for {symbol} at {timestamp}")
logger.info(f" Timeframes: {timeframes}, Extended window: ±{extended_window_minutes} minutes")
logger.info(f" (Includes ±{negative_samples_window} candles for negative sampling)")
# Calculate time range for extended context window
from datetime import timedelta
start_time = timestamp - timedelta(minutes=extended_window_minutes)
end_time = timestamp + timedelta(minutes=extended_window_minutes)
# Fetch data for each timeframe
market_state = {
@@ -253,13 +338,66 @@ class RealTrainingAdapter:
'timeframes': {}
}
# Try to get data from DuckDB storage first (historical data)
duckdb_storage = None
if hasattr(self.data_provider, 'duckdb_storage'):
duckdb_storage = self.data_provider.duckdb_storage
for timeframe in timeframes:
# Get historical data around the timestamp
# For now, just get the latest data (we can improve this later)
df = None
# Calculate appropriate limit based on timeframe and window
# We want enough candles to cover the extended window plus negative samples
if timeframe == '1s':
limit = extended_window_minutes * 60 * 2 + 100 # 2x for safety + buffer
elif timeframe == '1m':
limit = extended_window_minutes * 2 + 50 # 2x for safety + buffer
elif timeframe == '1h':
limit = max(200, extended_window_minutes // 30) # At least 200 candles
elif timeframe == '1d':
limit = 200 # Fixed for daily
else:
limit = 300
# Try DuckDB storage first (has historical data)
if duckdb_storage:
try:
df = duckdb_storage.get_ohlcv_data(
symbol=symbol,
timeframe=timeframe,
start_time=start_time,
end_time=end_time,
limit=limit,
direction='latest'
)
if df is not None and not df.empty:
logger.debug(f" {timeframe}: {len(df)} candles from DuckDB (historical)")
except Exception as e:
logger.debug(f" {timeframe}: DuckDB query failed: {e}")
# Fallback to data_provider (might have cached data)
if df is None or df.empty:
try:
# Use get_historical_data_replay for time-specific data
replay_data = self.data_provider.get_historical_data_replay(
symbol=symbol,
start_time=start_time,
end_time=end_time,
timeframes=[timeframe]
)
df = replay_data.get(timeframe)
if df is not None and not df.empty:
logger.debug(f" {timeframe}: {len(df)} candles from replay")
except Exception as e:
logger.debug(f" {timeframe}: Replay failed: {e}")
# Last resort: get latest data (not ideal but better than nothing)
if df is None or df.empty:
logger.warning(f" {timeframe}: No historical data found, using latest data as fallback")
df = self.data_provider.get_historical_data(
symbol=symbol,
timeframe=timeframe,
limit=100 # Get 100 candles for context
limit=limit # Use calculated limit
)
if df is not None and not df.empty:
@@ -272,15 +410,15 @@ class RealTrainingAdapter:
'close': df['close'].tolist(),
'volume': df['volume'].tolist()
}
logger.debug(f" {timeframe}: {len(df)} candles")
logger.debug(f" {timeframe}: {len(df)} candles stored")
else:
logger.warning(f" {timeframe}: No data")
logger.warning(f" {timeframe}: No data available")
if market_state['timeframes']:
logger.info(f" Fetched market state with {len(market_state['timeframes'])} timeframes")
return market_state
else:
logger.warning(f" No market data fetched")
logger.warning(f" No market data fetched for any timeframe")
return {}
except Exception as e:
@@ -441,23 +579,9 @@ class RealTrainingAdapter:
logger.debug(" No holding period, skipping HOLD samples")
return hold_samples
# Parse entry timestamp - handle multiple formats
# Parse entry timestamp using unified parser
try:
if 'T' in entry_timestamp:
entry_time = datetime.fromisoformat(entry_timestamp.replace('Z', '+00:00'))
else:
# Try with seconds first, then without
try:
entry_time = datetime.strptime(entry_timestamp, '%Y-%m-%d %H:%M:%S')
except ValueError:
# Try without seconds
entry_time = datetime.strptime(entry_timestamp, '%Y-%m-%d %H:%M')
# Make timezone-aware
if pytz:
entry_time = entry_time.replace(tzinfo=pytz.UTC)
else:
entry_time = entry_time.replace(tzinfo=timezone.utc)
entry_time = parse_timestamp_to_utc(entry_timestamp)
except Exception as e:
logger.warning(f"Could not parse entry timestamp '{entry_timestamp}': {e}")
return hold_samples
@@ -473,18 +597,9 @@ class RealTrainingAdapter:
# Find all candles between entry and exit
for idx, ts_str in enumerate(timestamps):
# Parse timestamp and ensure it's timezone-aware
# Parse timestamp using unified parser
try:
if 'T' in ts_str:
ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
else:
ts = datetime.fromisoformat(ts_str.replace(' ', 'T'))
# Make timezone-aware if it's naive
if ts.tzinfo is None:
if pytz:
ts = ts.replace(tzinfo=pytz.UTC)
else:
ts = ts.replace(tzinfo=timezone.utc)
ts = parse_timestamp_to_utc(ts_str)
except Exception as e:
logger.debug(f"Could not parse timestamp '{ts_str}': {e}")
continue
@@ -550,23 +665,9 @@ class RealTrainingAdapter:
# Find the index of the signal timestamp
from datetime import datetime
# Parse signal timestamp - handle different formats
# Parse signal timestamp using unified parser
try:
if 'T' in signal_timestamp:
signal_time = datetime.fromisoformat(signal_timestamp.replace('Z', '+00:00'))
else:
# Try with seconds first, then without
try:
signal_time = datetime.strptime(signal_timestamp, '%Y-%m-%d %H:%M:%S')
except ValueError:
# Try without seconds
signal_time = datetime.strptime(signal_timestamp, '%Y-%m-%d %H:%M')
# Make timezone-aware
if pytz:
signal_time = signal_time.replace(tzinfo=pytz.UTC)
else:
signal_time = signal_time.replace(tzinfo=timezone.utc)
signal_time = parse_timestamp_to_utc(signal_timestamp)
except Exception as e:
logger.warning(f"Could not parse signal timestamp '{signal_timestamp}': {e}")
return negative_samples
@@ -574,22 +675,8 @@ class RealTrainingAdapter:
signal_index = None
for idx, ts_str in enumerate(timestamps):
try:
# Parse timestamp from market data - handle multiple formats
if 'T' in ts_str:
ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
else:
# Try with seconds first, then without
try:
ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M:%S')
except ValueError:
ts = datetime.strptime(ts_str, '%Y-%m-%d %H:%M')
# Make timezone-aware if naive
if ts.tzinfo is None:
if pytz:
ts = ts.replace(tzinfo=pytz.UTC)
else:
ts = ts.replace(tzinfo=timezone.utc)
# Parse timestamp using unified parser
ts = parse_timestamp_to_utc(ts_str)
# Match within 1 minute
if abs((ts - signal_time).total_seconds()) < 60:
@@ -1147,8 +1234,9 @@ class RealTrainingAdapter:
future_prices = torch.tensor([future_price], dtype=torch.float32)
# Trade success (1.0 if profitable, 0.0 otherwise)
# Shape must be [batch_size, 1] to match confidence head output
profit_loss_pct = training_sample.get('profit_loss_pct', 0.0)
trade_success = torch.tensor([1.0 if profit_loss_pct > 0 else 0.0], dtype=torch.float32)
trade_success = torch.tensor([[1.0 if profit_loss_pct > 0 else 0.0]], dtype=torch.float32)
# Return batch dictionary
batch = {

View File

@@ -971,20 +971,20 @@ class TradingTransformerTrainer:
# Add confidence loss if available
if 'confidence' in outputs and 'trade_success' in batch:
# Ensure both tensors have compatible shapes for BCELoss
# BCELoss requires both inputs to have the same shape
confidence_pred = outputs['confidence'] # Keep as [batch_size, 1]
# Both tensors should have shape [batch_size, 1]
# confidence: [batch_size, 1] from confidence_head
# trade_success: [batch_size, 1] from batch preparation
confidence_pred = outputs['confidence']
trade_target = batch['trade_success'].float()
# Reshape target to match prediction shape [batch_size, 1]
# Verify shapes match (should both be [batch_size, 1])
if confidence_pred.shape != trade_target.shape:
logger.warning(f"Shape mismatch: confidence {confidence_pred.shape} vs target {trade_target.shape}")
# Reshape to match if needed
if trade_target.dim() == 1:
trade_target = trade_target.unsqueeze(-1)
# Ensure both have same shape
if confidence_pred.shape != trade_target.shape:
# If shapes still don't match, squeeze both to 1D
confidence_pred = confidence_pred.view(-1)
trade_target = trade_target.view(-1)
if confidence_pred.dim() == 1:
confidence_pred = confidence_pred.unsqueeze(-1)
confidence_loss = self.confidence_criterion(confidence_pred, trade_target)
# Use addition instead of += to avoid inplace operation

View File

@@ -1,355 +0,0 @@
# Unified Data Storage System - Complete Implementation
## 🎉 Project Complete!
The unified data storage system has been successfully implemented and integrated into the existing DataProvider.
## Completed Tasks (8 out of 10)
### Task 1: TimescaleDB Schema and Infrastructure
**Files:**
- `core/unified_storage_schema.py` - Schema manager with migrations
- `scripts/setup_unified_storage.py` - Automated setup script
- `docs/UNIFIED_STORAGE_SETUP.md` - Setup documentation
**Features:**
- 5 hypertables (OHLCV, order book, aggregations, imbalances, trades)
- 5 continuous aggregates for multi-timeframe data
- 15+ optimized indexes
- Compression policies (>80% compression)
- Retention policies (30 days to 2 years)
### Task 2: Data Models and Validation
**Files:**
- `core/unified_data_models.py` - Data structures
- `core/unified_data_validator.py` - Validation logic
**Features:**
- `InferenceDataFrame` - Complete inference data
- `OrderBookDataFrame` - Order book with imbalances
- `OHLCVCandle`, `TradeEvent` - Individual data types
- Comprehensive validation and sanitization
### Task 3: Cache Layer
**Files:**
- `core/unified_cache_manager.py` - In-memory caching
**Features:**
- <10ms read latency
- 5-minute rolling window
- Thread-safe operations
- Automatic eviction
- Statistics tracking
### Task 4: Database Connection and Query Layer
**Files:**
- `core/unified_database_manager.py` - Connection pool and queries
**Features:**
- Async connection pooling
- Health monitoring
- Optimized query methods
- <100ms query latency
- Multi-timeframe support
### Task 5: Data Ingestion Pipeline
**Files:**
- `core/unified_ingestion_pipeline.py` - Real-time ingestion
**Features:**
- Batch writes (100 items or 5 seconds)
- Data validation before storage
- Background flush worker
- >1000 ops/sec throughput
- Error handling and retry logic
### Task 6: Unified Data Provider API
**Files:**
- `core/unified_data_provider_extension.py` - Main API
**Features:**
- Single `get_inference_data()` endpoint
- Automatic cache/database routing
- Multi-timeframe data retrieval
- Order book data access
- Statistics tracking
### Task 7: Data Migration System
**Status:** Skipped (decided to drop existing Parquet data)
### Task 8: Integration with Existing DataProvider
**Files:**
- `core/data_provider.py` - Updated with unified storage methods
- `docs/UNIFIED_STORAGE_INTEGRATION.md` - Integration guide
- `examples/unified_storage_example.py` - Usage examples
**Features:**
- Seamless integration with existing code
- Backward compatible
- Opt-in unified storage
- Easy to enable/disable
## 📊 System Architecture
```
┌─────────────────────────────────────────────┐
│ Application Layer │
│ (Models, Backtesting, Annotation, etc.) │
└────────────────┬────────────────────────────┘
┌─────────────────────────────────────────────┐
│ DataProvider (Existing) │
│ + Unified Storage Extension (New) │
└────────────────┬────────────────────────────┘
┌────────┴────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Cache Layer │ │ Database │
│ (In-Memory) │ │ (TimescaleDB)│
│ │ │ │
│ - Last 5 min │ │ - Historical │
│ - <10ms read │ │ - <100ms read│
│ - Real-time │ │ - Compressed │
└──────────────┘ └──────────────┘
```
## Key Features
### Performance
- Cache reads: <10ms
- Database queries: <100ms
- Ingestion: >1000 ops/sec
- Compression: >80%
### Reliability
- Data validation
- Error handling
- Health monitoring
- Statistics tracking
- Automatic reconnection
### Usability
- Single endpoint for all data
- Automatic routing (cache vs database)
- Type-safe interfaces
- Backward compatible
- Easy to integrate
## 📝 Quick Start
### 1. Setup Database
```bash
python scripts/setup_unified_storage.py
```
### 2. Enable in Code
```python
from core.data_provider import DataProvider
import asyncio
data_provider = DataProvider()
async def setup():
await data_provider.enable_unified_storage()
asyncio.run(setup())
```
### 3. Use Unified API
```python
# Get real-time data (from cache)
data = await data_provider.get_inference_data_unified('ETH/USDT')
# Get historical data (from database)
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=datetime(2024, 1, 15, 12, 30)
)
```
## 📚 Documentation
- **Setup Guide**: `docs/UNIFIED_STORAGE_SETUP.md`
- **Integration Guide**: `docs/UNIFIED_STORAGE_INTEGRATION.md`
- **Examples**: `examples/unified_storage_example.py`
- **Design Document**: `.kiro/specs/unified-data-storage/design.md`
- **Requirements**: `.kiro/specs/unified-data-storage/requirements.md`
## 🎯 Use Cases
### Real-Time Trading
```python
# Fast access to latest market data
data = await data_provider.get_inference_data_unified('ETH/USDT')
price = data.get_latest_price()
```
### Backtesting
```python
# Historical data at any timestamp
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=target_time,
context_window_minutes=60
)
```
### Data Annotation
```python
# Retrieve data at specific timestamps for labeling
for timestamp in annotation_timestamps:
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=timestamp,
context_window_minutes=5
)
# Display and annotate
```
### Model Training
```python
# Get complete inference data for training
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=training_timestamp
)
features = {
'ohlcv': data.ohlcv_1m.to_numpy(),
'indicators': data.indicators,
'imbalances': data.imbalances.to_numpy()
}
```
## 📈 Performance Metrics
### Cache Performance
- Hit Rate: >90% (typical)
- Read Latency: <10ms
- Capacity: 5 minutes of data
- Eviction: Automatic
### Database Performance
- Query Latency: <100ms (typical)
- Write Throughput: >1000 ops/sec
- Compression Ratio: >80%
- Storage: Optimized with TimescaleDB
### Ingestion Performance
- Validation: All data validated
- Batch Size: 100 items or 5 seconds
- Error Rate: <0.1% (typical)
- Retry: Automatic with backoff
## 🔧 Configuration
### Database Config (`config.yaml`)
```yaml
database:
host: localhost
port: 5432
name: trading_data
user: postgres
password: postgres
pool_size: 20
```
### Cache Config
```python
cache_manager = DataCacheManager(
cache_duration_seconds=300 # 5 minutes
)
```
### Ingestion Config
```python
ingestion_pipeline = DataIngestionPipeline(
batch_size=100,
batch_timeout_seconds=5.0
)
```
## 🎓 Examples
Run the example script:
```bash
python examples/unified_storage_example.py
```
This demonstrates:
1. Real-time data access
2. Historical data retrieval
3. Multi-timeframe queries
4. Order book data
5. Statistics tracking
## 🔍 Monitoring
### Get Statistics
```python
stats = data_provider.get_unified_storage_stats()
print(f"Cache hit rate: {stats['cache']['hit_rate_percent']}%")
print(f"DB queries: {stats['database']['total_queries']}")
print(f"Ingestion rate: {stats['ingestion']['total_ingested']}")
```
### Check Health
```python
if data_provider.is_unified_storage_enabled():
print(" Unified storage is running")
else:
print(" Unified storage is not enabled")
```
## 🚧 Remaining Tasks (Optional)
### Task 9: Performance Optimization
- Add detailed monitoring dashboards
- Implement query caching
- Optimize database indexes
- Add performance alerts
### Task 10: Documentation and Deployment
- Create video tutorials
- Add API reference documentation
- Create deployment guides
- Add monitoring setup
## 🎉 Success Metrics
**Completed**: 8 out of 10 major tasks (80%)
**Core Functionality**: 100% complete
**Integration**: Seamless with existing code
**Performance**: Meets all targets
**Documentation**: Comprehensive guides
**Examples**: Working code samples
## 🙏 Next Steps
The unified storage system is **production-ready** and can be used immediately:
1. **Setup Database**: Run `python scripts/setup_unified_storage.py`
2. **Enable in Code**: Call `await data_provider.enable_unified_storage()`
3. **Start Using**: Use `get_inference_data_unified()` for all data access
4. **Monitor**: Check statistics with `get_unified_storage_stats()`
## 📞 Support
For issues or questions:
1. Check documentation in `docs/`
2. Review examples in `examples/`
3. Check database setup: `python scripts/setup_unified_storage.py`
4. Review logs for errors
---
**Status**: Production Ready
**Version**: 1.0.0
**Last Updated**: 2024
**Completion**: 80% (8/10 tasks)

View File

@@ -1,398 +0,0 @@
# Unified Storage System Integration Guide
## Overview
The unified storage system has been integrated into the existing `DataProvider` class, providing a single endpoint for both real-time and historical data access.
## Key Features
**Single Endpoint**: One method for all data access
**Automatic Routing**: Cache for real-time, database for historical
**Backward Compatible**: All existing methods still work
**Opt-In**: Only enabled when explicitly initialized
**Fast**: <10ms cache reads, <100ms database queries
## Quick Start
### 1. Enable Unified Storage
```python
from core.data_provider import DataProvider
import asyncio
# Create DataProvider (existing code works as before)
data_provider = DataProvider()
# Enable unified storage system
async def setup():
success = await data_provider.enable_unified_storage()
if success:
print(" Unified storage enabled!")
else:
print(" Failed to enable unified storage")
asyncio.run(setup())
```
### 2. Get Real-Time Data (from cache)
```python
async def get_realtime_data():
# Get latest real-time data (timestamp=None)
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
print(f"Symbol: {inference_data.symbol}")
print(f"Timestamp: {inference_data.timestamp}")
print(f"Latest price: {inference_data.get_latest_price()}")
print(f"Data source: {inference_data.data_source}") # 'cache'
print(f"Query latency: {inference_data.query_latency_ms}ms") # <10ms
# Check data completeness
if inference_data.has_complete_data():
print("✓ All required data present")
# Get data summary
summary = inference_data.get_data_summary()
print(f"OHLCV 1m rows: {summary['ohlcv_1m_rows']}")
print(f"Has orderbook: {summary['has_orderbook']}")
print(f"Imbalances rows: {summary['imbalances_rows']}")
asyncio.run(get_realtime_data())
```
### 3. Get Historical Data (from database)
```python
from datetime import datetime, timedelta
async def get_historical_data():
# Get historical data at specific timestamp
target_time = datetime.now() - timedelta(hours=1)
inference_data = await data_provider.get_inference_data_unified(
symbol='ETH/USDT',
timestamp=target_time,
context_window_minutes=5 # ±5 minutes of context
)
print(f"Data source: {inference_data.data_source}") # 'database'
print(f"Query latency: {inference_data.query_latency_ms}ms") # <100ms
# Access multi-timeframe data
print(f"1s candles: {len(inference_data.ohlcv_1s)}")
print(f"1m candles: {len(inference_data.ohlcv_1m)}")
print(f"1h candles: {len(inference_data.ohlcv_1h)}")
# Access technical indicators
print(f"RSI: {inference_data.indicators.get('rsi_14')}")
print(f"MACD: {inference_data.indicators.get('macd')}")
# Access context data
if inference_data.context_data is not None:
print(f"Context data: {len(inference_data.context_data)} rows")
asyncio.run(get_historical_data())
```
### 4. Get Multi-Timeframe Data
```python
async def get_multi_timeframe():
# Get multiple timeframes at once
multi_tf = await data_provider.get_multi_timeframe_data_unified(
symbol='ETH/USDT',
timeframes=['1m', '5m', '1h'],
limit=100
)
for timeframe, df in multi_tf.items():
print(f"{timeframe}: {len(df)} candles")
if not df.empty:
print(f" Latest close: {df.iloc[-1]['close_price']}")
asyncio.run(get_multi_timeframe())
```
### 5. Get Order Book Data
```python
async def get_orderbook():
# Get order book with imbalances
orderbook = await data_provider.get_order_book_data_unified('ETH/USDT')
print(f"Mid price: {orderbook.mid_price}")
print(f"Spread: {orderbook.spread}")
print(f"Spread (bps): {orderbook.get_spread_bps()}")
# Get best bid/ask
best_bid = orderbook.get_best_bid()
best_ask = orderbook.get_best_ask()
print(f"Best bid: {best_bid}")
print(f"Best ask: {best_ask}")
# Get imbalance summary
imbalances = orderbook.get_imbalance_summary()
print(f"Imbalances: {imbalances}")
asyncio.run(get_orderbook())
```
### 6. Get Statistics
```python
# Get unified storage statistics
stats = data_provider.get_unified_storage_stats()
print("=== Cache Statistics ===")
print(f"Hit rate: {stats['cache']['hit_rate_percent']}%")
print(f"Total entries: {stats['cache']['total_entries']}")
print("\n=== Database Statistics ===")
print(f"Total queries: {stats['database']['total_queries']}")
print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms")
print("\n=== Ingestion Statistics ===")
print(f"Total ingested: {stats['ingestion']['total_ingested']}")
print(f"Validation failures: {stats['ingestion']['validation_failures']}")
```
## Integration with Existing Code
### Backward Compatibility
All existing DataProvider methods continue to work:
```python
# Existing methods still work
df = data_provider.get_historical_data('ETH/USDT', '1m', limit=100)
price = data_provider.get_current_price('ETH/USDT')
features = data_provider.get_feature_matrix('ETH/USDT')
# New unified methods available alongside
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
```
### Gradual Migration
You can migrate to unified storage gradually:
```python
# Option 1: Use existing methods (no changes needed)
df = data_provider.get_historical_data('ETH/USDT', '1m')
# Option 2: Use unified storage for new features
inference_data = await data_provider.get_inference_data_unified('ETH/USDT')
```
## Use Cases
### 1. Real-Time Trading
```python
async def realtime_trading_loop():
while True:
# Get latest market data (fast!)
data = await data_provider.get_inference_data_unified('ETH/USDT')
# Make trading decision
if data.has_complete_data():
price = data.get_latest_price()
rsi = data.indicators.get('rsi_14', 50)
if rsi < 30:
print(f"Buy signal at {price}")
elif rsi > 70:
print(f"Sell signal at {price}")
await asyncio.sleep(1)
```
### 2. Backtesting
```python
async def backtest_strategy(start_time, end_time):
current_time = start_time
while current_time < end_time:
# Get historical data at specific time
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=current_time,
context_window_minutes=60
)
# Run strategy
if data.has_complete_data():
# Your strategy logic here
pass
# Move to next timestamp
current_time += timedelta(minutes=1)
```
### 3. Data Annotation
```python
async def annotate_data(timestamps):
annotations = []
for timestamp in timestamps:
# Get data at specific timestamp
data = await data_provider.get_inference_data_unified(
'ETH/USDT',
timestamp=timestamp,
context_window_minutes=5
)
# Display to user for annotation
# User marks buy/sell signals
annotation = {
'timestamp': timestamp,
'price': data.get_latest_price(),
'signal': 'buy', # User input
'data': data.to_dict()
}
annotations.append(annotation)
return annotations
```
### 4. Model Training
```python
async def prepare_training_data(symbol, start_time, end_time):
training_samples = []
current_time = start_time
while current_time < end_time:
# Get complete inference data
data = await data_provider.get_inference_data_unified(
symbol,
timestamp=current_time,
context_window_minutes=10
)
if data.has_complete_data():
# Extract features
features = {
'ohlcv_1m': data.ohlcv_1m.to_numpy(),
'indicators': data.indicators,
'imbalances': data.imbalances.to_numpy(),
'orderbook': data.orderbook_snapshot
}
training_samples.append(features)
current_time += timedelta(minutes=1)
return training_samples
```
## Configuration
### Database Configuration
Update `config.yaml`:
```yaml
database:
host: localhost
port: 5432
name: trading_data
user: postgres
password: postgres
pool_size: 20
```
### Setup Database
```bash
# Run setup script
python scripts/setup_unified_storage.py
```
## Performance Tips
1. **Use Real-Time Endpoint for Latest Data**
```python
# Fast (cache)
data = await data_provider.get_inference_data_unified('ETH/USDT')
# Slower (database)
data = await data_provider.get_inference_data_unified('ETH/USDT', datetime.now())
```
2. **Batch Historical Queries**
```python
# Get multiple timeframes at once
multi_tf = await data_provider.get_multi_timeframe_data_unified(
'ETH/USDT',
['1m', '5m', '1h'],
limit=100
)
```
3. **Monitor Performance**
```python
stats = data_provider.get_unified_storage_stats()
print(f"Cache hit rate: {stats['cache']['hit_rate_percent']}%")
print(f"Avg query time: {stats['database']['avg_query_time_ms']}ms")
```
## Troubleshooting
### Unified Storage Not Available
```python
if not data_provider.is_unified_storage_enabled():
success = await data_provider.enable_unified_storage()
if not success:
print("Check database connection and configuration")
```
### Slow Queries
```python
# Check query latency
data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp)
if data.query_latency_ms > 100:
print(f"Slow query: {data.query_latency_ms}ms")
# Check database stats
stats = data_provider.get_unified_storage_stats()
print(stats['database'])
```
### Missing Data
```python
data = await data_provider.get_inference_data_unified('ETH/USDT', timestamp)
if not data.has_complete_data():
summary = data.get_data_summary()
print(f"Missing data: {summary}")
```
## API Reference
### Main Methods
- `enable_unified_storage()` - Enable unified storage system
- `disable_unified_storage()` - Disable unified storage system
- `get_inference_data_unified()` - Get complete inference data
- `get_multi_timeframe_data_unified()` - Get multi-timeframe data
- `get_order_book_data_unified()` - Get order book with imbalances
- `get_unified_storage_stats()` - Get statistics
- `is_unified_storage_enabled()` - Check if enabled
### Data Models
- `InferenceDataFrame` - Complete inference data structure
- `OrderBookDataFrame` - Order book with imbalances
- `OHLCVCandle` - Single candlestick
- `TradeEvent` - Individual trade
## Support
For issues or questions:
1. Check database connection: `python scripts/setup_unified_storage.py`
2. Review logs for errors
3. Check statistics: `data_provider.get_unified_storage_stats()`

View File

@@ -1,337 +0,0 @@
# Unified Data Storage Setup Guide
## Overview
The unified data storage system consolidates all market data storage into a single TimescaleDB backend, replacing fragmented Parquet files, pickle files, and in-memory caches.
## Prerequisites
### 1. PostgreSQL with TimescaleDB
You need PostgreSQL 12+ with TimescaleDB extension installed.
#### Installation Options
**Option A: Docker (Recommended)**
```bash
docker run -d --name timescaledb \
-p 5432:5432 \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=trading_data \
timescale/timescaledb:latest-pg14
```
**Option B: Local Installation**
- Follow TimescaleDB installation guide: https://docs.timescale.com/install/latest/
- Create database: `createdb trading_data`
### 2. Python Dependencies
Ensure you have the required Python packages:
```bash
pip install asyncpg
```
## Database Configuration
Update your `config.yaml` with database connection details:
```yaml
database:
host: localhost
port: 5432
name: trading_data
user: postgres
password: postgres
pool_size: 20
```
## Setup Process
### Step 1: Run Setup Script
```bash
python scripts/setup_unified_storage.py
```
This script will:
1. Connect to the database
2. Verify TimescaleDB extension
3. Create all required tables
4. Convert tables to hypertables
5. Create indexes for performance
6. Set up continuous aggregates
7. Configure compression policies
8. Configure retention policies
9. Verify the setup
10. Run basic operation tests
### Step 2: Verify Setup
The setup script will display schema information:
```
=== Schema Information ===
Migrations applied: 8
Tables created: 5
Hypertables: 5
Continuous aggregates: 5
=== Table Sizes ===
ohlcv_data: 8192 bytes
order_book_snapshots: 8192 bytes
order_book_1s_agg: 8192 bytes
order_book_imbalances: 8192 bytes
trade_events: 8192 bytes
=== Hypertables ===
ohlcv_data: 0 chunks, compression=enabled
order_book_snapshots: 0 chunks, compression=enabled
order_book_1s_agg: 0 chunks, compression=enabled
order_book_imbalances: 0 chunks, compression=enabled
trade_events: 0 chunks, compression=enabled
=== Continuous Aggregates ===
ohlcv_1m_continuous: 8192 bytes
ohlcv_5m_continuous: 8192 bytes
ohlcv_15m_continuous: 8192 bytes
ohlcv_1h_continuous: 8192 bytes
ohlcv_1d_continuous: 8192 bytes
```
## Database Schema
### Tables
#### 1. ohlcv_data
Stores candlestick data for all timeframes with pre-calculated technical indicators.
**Columns:**
- `timestamp` (TIMESTAMPTZ): Candle timestamp
- `symbol` (VARCHAR): Trading pair (e.g., 'ETH/USDT')
- `timeframe` (VARCHAR): Timeframe (1s, 1m, 5m, 15m, 1h, 1d)
- `open_price`, `high_price`, `low_price`, `close_price` (DECIMAL): OHLC prices
- `volume` (DECIMAL): Trading volume
- `trade_count` (INTEGER): Number of trades
- Technical indicators: `rsi_14`, `macd`, `macd_signal`, `bb_upper`, `bb_middle`, `bb_lower`, etc.
**Primary Key:** `(timestamp, symbol, timeframe)`
#### 2. order_book_snapshots
Stores raw order book snapshots.
**Columns:**
- `timestamp` (TIMESTAMPTZ): Snapshot timestamp
- `symbol` (VARCHAR): Trading pair
- `exchange` (VARCHAR): Exchange name
- `bids` (JSONB): Bid levels (top 50)
- `asks` (JSONB): Ask levels (top 50)
- `mid_price`, `spread`, `bid_volume`, `ask_volume` (DECIMAL): Calculated metrics
**Primary Key:** `(timestamp, symbol, exchange)`
#### 3. order_book_1s_agg
Stores 1-second aggregated order book data with $1 price buckets.
**Columns:**
- `timestamp` (TIMESTAMPTZ): Aggregation timestamp
- `symbol` (VARCHAR): Trading pair
- `price_bucket` (DECIMAL): Price bucket ($1 increments)
- `bid_volume`, `ask_volume` (DECIMAL): Aggregated volumes
- `bid_count`, `ask_count` (INTEGER): Number of orders
- `imbalance` (DECIMAL): Order book imbalance
**Primary Key:** `(timestamp, symbol, price_bucket)`
#### 4. order_book_imbalances
Stores multi-timeframe order book imbalance metrics.
**Columns:**
- `timestamp` (TIMESTAMPTZ): Calculation timestamp
- `symbol` (VARCHAR): Trading pair
- `imbalance_1s`, `imbalance_5s`, `imbalance_15s`, `imbalance_60s` (DECIMAL): Imbalances
- `volume_imbalance_1s`, `volume_imbalance_5s`, etc. (DECIMAL): Volume-weighted imbalances
- `price_range` (DECIMAL): Price range used for calculation
**Primary Key:** `(timestamp, symbol)`
#### 5. trade_events
Stores individual trade events.
**Columns:**
- `timestamp` (TIMESTAMPTZ): Trade timestamp
- `symbol` (VARCHAR): Trading pair
- `exchange` (VARCHAR): Exchange name
- `price` (DECIMAL): Trade price
- `size` (DECIMAL): Trade size
- `side` (VARCHAR): Trade side ('buy' or 'sell')
- `trade_id` (VARCHAR): Unique trade identifier
**Primary Key:** `(timestamp, symbol, exchange, trade_id)`
### Continuous Aggregates
Continuous aggregates automatically generate higher timeframe data from lower timeframes:
1. **ohlcv_1m_continuous**: 1-minute candles from 1-second data
2. **ohlcv_5m_continuous**: 5-minute candles from 1-minute data
3. **ohlcv_15m_continuous**: 15-minute candles from 5-minute data
4. **ohlcv_1h_continuous**: 1-hour candles from 15-minute data
5. **ohlcv_1d_continuous**: 1-day candles from 1-hour data
### Compression Policies
Data is automatically compressed to save storage:
- **ohlcv_data**: Compress after 7 days
- **order_book_snapshots**: Compress after 1 day
- **order_book_1s_agg**: Compress after 2 days
- **order_book_imbalances**: Compress after 2 days
- **trade_events**: Compress after 7 days
Expected compression ratio: **>80%**
### Retention Policies
Old data is automatically deleted:
- **ohlcv_data**: Retain for 2 years
- **order_book_snapshots**: Retain for 30 days
- **order_book_1s_agg**: Retain for 60 days
- **order_book_imbalances**: Retain for 60 days
- **trade_events**: Retain for 90 days
## Performance Optimization
### Indexes
All tables have optimized indexes for common query patterns:
- Symbol + timestamp queries
- Timeframe-specific queries
- Exchange-specific queries
- Multi-column composite indexes
### Query Performance Targets
- **Cache reads**: <10ms
- **Single timestamp queries**: <100ms
- **Time range queries (1 hour)**: <500ms
- **Ingestion throughput**: >1000 ops/sec
### Best Practices
1. **Use time_bucket for aggregations**:
```sql
SELECT time_bucket('1 minute', timestamp) AS bucket,
symbol,
avg(close_price) AS avg_price
FROM ohlcv_data
WHERE symbol = 'ETH/USDT'
AND timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY bucket, symbol;
```
2. **Query specific timeframes**:
```sql
SELECT * FROM ohlcv_data
WHERE symbol = 'ETH/USDT'
AND timeframe = '1m'
AND timestamp >= NOW() - INTERVAL '1 day'
ORDER BY timestamp DESC;
```
3. **Use continuous aggregates for historical data**:
```sql
SELECT * FROM ohlcv_1h_continuous
WHERE symbol = 'ETH/USDT'
AND timestamp >= NOW() - INTERVAL '7 days'
ORDER BY timestamp DESC;
```
## Monitoring
### Check Database Size
```sql
SELECT
hypertable_name,
pg_size_pretty(total_bytes) AS total_size,
pg_size_pretty(compressed_total_bytes) AS compressed_size,
ROUND((1 - compressed_total_bytes::numeric / total_bytes::numeric) * 100, 2) AS compression_ratio
FROM timescaledb_information.hypertables
WHERE hypertable_schema = 'public';
```
### Check Chunk Information
```sql
SELECT
hypertable_name,
num_chunks,
num_compressed_chunks,
compression_enabled
FROM timescaledb_information.hypertables
WHERE hypertable_schema = 'public';
```
### Check Continuous Aggregate Status
```sql
SELECT
view_name,
materialization_hypertable_name,
pg_size_pretty(total_bytes) AS size
FROM timescaledb_information.continuous_aggregates
WHERE view_schema = 'public';
```
## Troubleshooting
### TimescaleDB Extension Not Found
If you see "TimescaleDB extension not found":
1. Ensure TimescaleDB is installed
2. Connect to database and run: `CREATE EXTENSION timescaledb;`
3. Restart the setup script
### Connection Refused
If you see "connection refused":
1. Check PostgreSQL is running: `pg_isready`
2. Verify connection details in `config.yaml`
3. Check firewall settings
### Permission Denied
If you see "permission denied":
1. Ensure database user has CREATE privileges
2. Grant privileges: `GRANT ALL PRIVILEGES ON DATABASE trading_data TO postgres;`
### Slow Queries
If queries are slow:
1. Check if indexes exist: `\di` in psql
2. Analyze query plan: `EXPLAIN ANALYZE <your query>`
3. Ensure compression is enabled
4. Consider adding more specific indexes
## Next Steps
After setup is complete:
1. **Implement data models** (Task 2)
2. **Implement cache layer** (Task 3)
3. **Implement database connection layer** (Task 4)
4. **Start data migration** from Parquet files (Task 7)
## Support
For issues or questions:
- Check TimescaleDB docs: https://docs.timescale.com/
- Review PostgreSQL logs: `tail -f /var/log/postgresql/postgresql-*.log`
- Enable debug logging in setup script

View File

@@ -0,0 +1,308 @@
# Model Inputs & Outputs Reference
Quick reference for all trading models in the system.
---
## 1. Transformer (AdvancedTradingTransformer)
**Type**: Sequence-to-sequence transformer for multi-timeframe analysis
**Size**: 46M parameters
**Architecture**: 12 layers, 16 attention heads, 1024 model dimension
### Inputs
```python
price_data: [batch, 150, 5] # OHLCV sequences (150 candles)
cob_data: [batch, 150, 100] # Change of Bid features
tech_data: [batch, 40] # Technical indicators (SMA, returns, volatility)
market_data: [batch, 30] # Market context (volume, pivots, support/resistance)
```
### Outputs
```python
action_logits: [batch, 3] # Raw logits for BUY(1), SELL(2), HOLD(0)
action_probs: [batch, 3] # Softmax probabilities
confidence: [batch, 1] # Trade confidence (0-1)
price_prediction: [batch, 1] # Future price target
volatility_prediction:[batch, 1] # Expected volatility
trend_strength: [batch, 1] # Trend strength (-1 to 1)
# Next candle predictions for each timeframe
next_candles: {
'1s': [batch, 5], # [open, high, low, close, volume]
'1m': [batch, 5],
'1h': [batch, 5],
'1d': [batch, 5]
}
# Pivot point predictions (L1-L5)
next_pivots: {
'L1': {
'price': [batch, 1],
'type_prob_high': [batch, 1], # Probability of high pivot
'type_prob_low': [batch, 1], # Probability of low pivot
'confidence': [batch, 1]
},
# ... L2, L3, L4, L5 (same structure)
}
# Trend vector analysis
trend_analysis: {
'angle_radians': [batch, 1], # Trend angle
'steepness': [batch, 1], # Trend steepness
'direction': [batch, 1] # Direction (-1 to 1)
}
```
### Training Targets
```python
actions: [batch] # Action labels (0=HOLD, 1=BUY, 2=SELL)
future_prices: [batch] # Price targets
trade_success: [batch, 1] # Success labels (0.0 or 1.0)
```
---
## 2. CNN (StandardizedCNN / EnhancedCNN)
**Type**: Convolutional neural network for pattern recognition
**Size**: ~5-10M parameters
**Architecture**: Multi-scale convolutions with attention
### Inputs
```python
# Via BaseDataInput.get_feature_vector()
feature_vector: [batch, 7834] # Flattened features containing:
- OHLCV ETH: 300 frames × 4 timeframes × 5 = 6000
- OHLCV BTC: 300 frames × 5 = 1500
- COB features: 184 (±20 buckets + MA imbalance)
- Technical indicators: 100 (padded)
- Last predictions: 50 (padded)
```
### Outputs
```python
action_logits: [batch, 3] # BUY, SELL, HOLD logits
action_probs: [batch, 3] # Softmax probabilities
confidence: [batch, 1] # Prediction confidence
hidden_states: [batch, 1024] # Feature embeddings (for cross-model feeding)
predicted_returns: [batch, 4] # [return_1s, return_1m, return_1h, return_1d]
```
### Training Targets
```python
actions: [batch] # Action labels (0=HOLD, 1=BUY, 2=SELL)
returns: [batch, 4] # Actual returns per timeframe
```
---
## 3. DQN (Deep Q-Network Agent)
**Type**: Reinforcement learning agent for sequential decision making
**Size**: ~15M parameters
**Architecture**: Deep Q-Network with dueling architecture
### Inputs
```python
# Via BaseDataInput.get_feature_vector()
state: [batch, 7850] # Full feature vector including:
- Multi-timeframe OHLCV data
- COB features
- Technical indicators
- Market regime indicators
- Previous predictions
```
### Outputs
```python
q_values: [batch, 3] # Q-values for BUY, SELL, HOLD
action: int # Selected action (0, 1, 2)
confidence: float # Action confidence (0-1)
# Auxiliary outputs
regime_probs: [batch, 4] # [trending, ranging, volatile, mixed]
price_direction:[batch, 3] # [down, neutral, up]
volatility: [batch, 1] # Predicted volatility
value: [batch, 1] # State value (V)
advantage: [batch, 3] # Action advantages (A)
```
### Training Targets
```python
# RL uses experience replay
experience: {
'state': [7850],
'action': int,
'reward': float,
'next_state': [7850],
'done': bool
}
```
---
## 4. COB RL Model (MassiveRLNetwork)
**Type**: Specialized RL for Change of Bid (COB) data
**Size**: ~3M parameters
**Architecture**: Deep network focused on order book dynamics
### Inputs
```python
cob_features: [batch, input_size] # COB-specific features:
- Bid/ask imbalance
- Order book depth
- Price level changes
- Volume at price levels
- Moving averages of imbalance
```
### Outputs
```python
price_logits: [batch, 3] # Direction logits [DOWN, SIDEWAYS, UP]
price_probs: [batch, 3] # Direction probabilities
confidence: [batch, 1] # Prediction confidence
value: [batch, 1] # State value estimate
predicted_direction: int # 0=DOWN, 1=SIDEWAYS, 2=UP
```
### Training Targets
```python
targets: {
'direction': [batch], # Direction labels (0, 1, 2)
'value': [batch], # Value targets
'confidence': [batch] # Confidence targets
}
```
---
## 5. Extrema Trainer
**Type**: Pivot point detection and prediction
**Size**: ~1M parameters (lightweight)
**Architecture**: Statistical + ML hybrid
### Inputs
```python
# Context data (200 candles)
context: {
'symbol': str,
'candles': deque[200], # Recent OHLCV candles
'features': array, # Extracted features
'last_update': datetime
}
# For prediction
current_price: float
now: datetime
```
### Outputs
```python
# Detected extrema
extrema: {
'type': str, # 'high' or 'low'
'price': float,
'timestamp': datetime,
'confidence': float, # 0-1
'window_size': int
}
# Predicted pivot
predicted_pivot: {
'type': str, # 'high' or 'low'
'price': float, # Predicted price level
'timestamp': datetime, # Predicted time
'confidence': float, # 0-1
'horizon_seconds': int # Time until pivot (30-300s)
}
```
### Training Data
```python
# Historical extrema for validation
historical_extrema: List[{
'price': float,
'timestamp': datetime,
'type': str,
'detected': bool
}]
```
---
## Common Patterns
### Action Encoding (All Models)
```python
0 = HOLD # No action / maintain position
1 = BUY # Enter long / close short
2 = SELL # Enter short / close long
```
### Confidence Scores
- Range: `0.0` to `1.0`
- Typical threshold: `0.6` (60%)
- High confidence: `> 0.8`
- Low confidence: `< 0.4`
### Batch Sizes
- **Training**: Usually `1` (annotation-based) or `32-128` (batch training)
- **Inference**: Usually `1` (real-time prediction)
### Device Management
All models support:
- CPU: `torch.device('cpu')`
- CUDA: `torch.device('cuda')`
- Automatic device selection based on availability
---
## Model Selection Guide
| Use Case | Recommended Model | Why |
|----------|------------------|-----|
| Multi-timeframe analysis | **Transformer** | Handles 150-candle sequences across timeframes |
| Pattern recognition | **CNN** | Excellent at visual pattern detection |
| Sequential decisions | **DQN** | Learns optimal action sequences via RL |
| Order book dynamics | **COB RL** | Specialized for bid/ask imbalance |
| Pivot detection | **Extrema** | Lightweight, fast pivot predictions |
---
## Integration Example
```python
# Get base data input
base_input = data_provider.get_base_data_input(symbol, timestamp)
# CNN prediction
cnn_features = base_input.get_feature_vector()
cnn_output = cnn_model(cnn_features)
cnn_action = torch.argmax(cnn_output['action_probs'])
# Transformer prediction
transformer_batch = prepare_transformer_batch(base_input)
transformer_output = transformer_model(**transformer_batch)
transformer_action = torch.argmax(transformer_output['action_probs'])
# DQN prediction
dqn_state = base_input.get_feature_vector()
dqn_output = dqn_agent.select_action(dqn_state)
dqn_action = dqn_output['action']
# Ensemble decision
final_action = majority_vote([cnn_action, transformer_action, dqn_action])
```
---
## Notes
1. **Shape Conventions**: `[batch, ...]` indicates batch dimension first
2. **Dtype**: All tensors use `torch.float32` unless specified
3. **Gradients**: Only training targets require gradients
4. **Normalization**: Features are typically normalized to `[-1, 1]` or `[0, 1]`
5. **Missing Data**: Padded with zeros or last known values