From 8068e554f389475d6d8ba78e666e8030925b29e2 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 2 Sep 2025 17:29:18 +0300 Subject: [PATCH] data stream --- .gitignore | 2 +- DATA_STREAM_GUIDE.md | 73 ++++++++++++++++++ DATA_STREAM_README.md | 171 +++++------------------------------------ check_stream.py | 144 ++++++++++++++++++++++++++++++++++ core/orchestrator.py | 113 ++++++++++++++++++++++++++- data_stream_control.py | 156 ------------------------------------- demo_data_stream.py | 15 +++- run_clean_dashboard.py | 18 ++--- 8 files changed, 370 insertions(+), 322 deletions(-) create mode 100644 DATA_STREAM_GUIDE.md create mode 100644 check_stream.py delete mode 100644 data_stream_control.py diff --git a/.gitignore b/.gitignore index 6f33583..9c4ffbd 100644 --- a/.gitignore +++ b/.gitignore @@ -52,4 +52,4 @@ wandb/ *.wandb *__pycache__/* NN/__pycache__/__init__.cpython-312.pyc -data_snapshot_*.json +*snapshot*.json diff --git a/DATA_STREAM_GUIDE.md b/DATA_STREAM_GUIDE.md new file mode 100644 index 0000000..d7edba8 --- /dev/null +++ b/DATA_STREAM_GUIDE.md @@ -0,0 +1,73 @@ +# Data Stream Management Guide + +## Quick Commands + +### Check Stream Status +```bash +python check_stream.py status +``` + +### Generate Snapshot +```bash +python check_stream.py snapshot +``` + +## What You'll See + +### Stream Status Output +- ✅ Dashboard is running +- 💡 Guidance message +- 📝 Data stream location note +- Console output examples to look for + +### Snapshot Output +- ✅ Snapshot saved: `data_snapshots/snapshot_YYYYMMDD_HHMMSS.json` +- 📝 Note about data location + +## How It Works + +The script connects to your **running dashboard** instead of creating a new instance: + +1. **Checks if dashboard is running** at `http://127.0.0.1:8050` +2. **Provides guidance** on where to find the data stream +3. **Generates snapshots** with current timestamp and metadata + +## Where to Find Live Data + +The **data stream is active inside the dashboard console**. Look for output like: + +``` +OHLCV (1m): ETH/USDT | O:4335.67 H:4338.92 L:4334.21 C:4336.67 V:125.8 +TICK: ETH/USDT | Price:4336.67 Vol:0.0456 Side:buy +DQN Prediction: BUY (conf:0.78) +Training Exp: Action:1 Reward:0.0234 Done:False +``` + +## When Data Appears + +Data will show in the dashboard console when: +1. **Market data is flowing** (OHLCV, ticks, COB) +2. **Models are making predictions** +3. **Training is active** + +## Snapshot Contents + +Snapshots contain: +- Timestamp +- Dashboard status +- Empty data arrays (data is in dashboard console) +- Note about checking console for live data + +## Usage Tips + +- **Start dashboard first**: `python run_clean_dashboard.py` +- **Check status** to confirm dashboard is running +- **Watch dashboard console** for live data stream +- **Generate snapshots** to capture timestamps and metadata +- **Wait for market activity** to see data populate + +## Files Created + +- `check_stream.py` - Status and snapshot commands +- `data_snapshots/` - Directory for saved snapshots +- `snapshot_*.json` - Timestamped snapshot files with metadata diff --git a/DATA_STREAM_README.md b/DATA_STREAM_README.md index 96c53a7..aa39450 100644 --- a/DATA_STREAM_README.md +++ b/DATA_STREAM_README.md @@ -1,168 +1,37 @@ # Data Stream Monitor -A comprehensive system for capturing and streaming all model input data in console-friendly text format, suitable for snapshots, training, and replay functionality. - -## Overview - -The Data Stream Monitor captures real-time data flows through the trading system and outputs them in two formats: -- **Detailed**: Human-readable format with clear sections -- **Compact**: JSON format for programmatic processing - -## Data Streams Captured - -### Market Data -- **OHLCV Data**: Multi-timeframe candlestick data (1m, 5m, 15m) -- **Tick Data**: Real-time trade ticks with price, volume, and side -- **COB Data**: Consolidated Order Book snapshots with imbalance and spread metrics - -### Model Data -- **Technical Indicators**: RSI, MACD, Bollinger Bands, etc. -- **Model States**: Current state vectors for each model (DQN, CNN, RL) -- **Predictions**: Recent predictions from all active models -- **Training Experiences**: State-action-reward tuples from RL training +The Data Stream Monitor captures and streams all model input data for analysis, snapshots, and replay. It is now fully managed by the `TradingOrchestrator` and starts automatically with the dashboard. ## Quick Start -### 1. Start the Dashboard ```bash -source venv/bin/activate +# Start the dashboard (starts the data stream automatically) python run_clean_dashboard.py ``` -### 2. Start Data Streaming -```bash -python data_stream_control.py start +## Status + +The orchestrator manages the data stream. You can check status in the dashboard logs; you should see a line like: + +``` +INFO - Data stream monitor initialized and started by orchestrator ``` -### 3. Control Streaming -```bash -# Check status -python data_stream_control.py status +## What it Collects -# Switch to compact format -python data_stream_control.py compact +- OHLCV data (1m, 5m, 15m) +- Tick data +- COB (order book) features (when available) +- Technical indicators +- Model states and predictions +- Training experiences for RL -# Save current snapshot -python data_stream_control.py snapshot +## Snapshots -# Stop streaming -python data_stream_control.py stop -``` +Snapshots are saved from within the running system when needed. The monitor API provides `save_snapshot(filepath)` if you call it programmatically. -## Output Formats +## Notes -### Detailed Format -``` -================================================================================ -DATA STREAM SAMPLE - 14:30:15 -================================================================================ -OHLCV (1m): ETH/USDT | O:4335.67 H:4338.92 L:4334.21 C:4336.67 V:125.8 -TICK: ETH/USDT | Price:4336.67 Vol:0.0456 Side:buy -COB: ETH/USDT | Imbalance:0.234 Spread:2.3bps Mid:4336.67 -DQN State: 15 features | Price:4336.67 -DQN Prediction: BUY (conf:0.78) -Training Exp: Action:1 Reward:0.0234 Done:False -================================================================================ -``` - -### Compact Format -```json -{"timestamp":"2024-01-15T14:30:15","ohlcv_count":5,"ticks_count":12,"cob_count":8,"predictions_count":3,"experiences_count":7,"price":4336.67,"volume":125.8,"imbalance":0.234,"spread_bps":2.3} -``` - -## Files - -### Core Components -- `data_stream_monitor.py` - Main streaming engine -- `data_stream_control.py` - Command-line control interface -- `demo_data_stream.py` - Usage examples and demo - -### Integration Points -- `run_clean_dashboard.py` - Auto-initializes streaming -- `core/orchestrator.py` - Provides prediction data -- `NN/training/enhanced_realtime_training.py` - Provides training data - -## Configuration - -The streaming system is configurable via the `stream_config` dictionary: - -```python -stream_config = { - 'console_output': True, # Enable/disable console output - 'compact_format': False, # Use compact JSON format - 'include_timestamps': True, # Include timestamps in output - 'filter_symbols': ['ETH/USDT'], # Symbols to monitor - 'sampling_rate': 1.0 # Sampling rate in seconds -} -``` - -## Use Cases - -### Training Data Collection -- Capture real market conditions during training -- Build datasets for offline model validation -- Replay specific market scenarios - -### Debugging and Monitoring -- Monitor model input data in real-time -- Debug prediction inconsistencies -- Validate data pipeline integrity - -### Snapshot and Replay -- Save complete system state for later analysis -- Replay specific time periods -- Compare model behavior across different market conditions - -## Technical Details - -### Data Collection -- **Thread-safe**: Uses separate thread for data collection -- **Memory-efficient**: Configurable buffer sizes with automatic cleanup -- **Error-resilient**: Continues streaming even if individual data sources fail - -### Integration -- **Non-intrusive**: Doesn't affect main trading system performance -- **Optional**: Can be disabled without affecting core functionality -- **Extensible**: Easy to add new data streams - -### Performance -- **Low overhead**: Minimal CPU and memory usage -- **Configurable sampling**: Adjust sampling rate based on needs -- **Efficient storage**: Circular buffers prevent memory leaks - -## Command Reference - -| Command | Description | -|---------|-------------| -| `start` | Start data streaming | -| `stop` | Stop data streaming | -| `status` | Show current status and buffer sizes | -| `snapshot` | Save current data snapshot to file | -| `compact` | Switch to compact JSON format | -| `detailed` | Switch to detailed human-readable format | - -## Troubleshooting - -### Streaming Not Starting -- Ensure dashboard is running first -- Check that venv is activated -- Verify data_stream_monitor.py is in project root - -### No Data Output -- Check streaming status with `python data_stream_control.py status` -- Verify market data is available (check dashboard logs) -- Ensure models are active and making predictions - -### Performance Issues -- Reduce sampling rate in stream_config -- Switch to compact format for less output -- Decrease buffer sizes if memory is limited - -## Future Enhancements - -- **File output**: Save streaming data to rotating log files -- **WebSocket output**: Stream data to external consumers -- **Compression**: Automatic compression for long-term storage -- **Filtering**: Advanced filtering based on market conditions -- **Metrics**: Built-in performance metrics and statistics +- No separate process or control script is required. +- The monitor runs inside the dashboard/orchestrator process for consistency. diff --git a/check_stream.py b/check_stream.py new file mode 100644 index 0000000..226114d --- /dev/null +++ b/check_stream.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Data Stream Checker - Connects to Running Dashboard +Checks stream status and generates snapshots from the running dashboard. +""" + +import sys +import os +import requests +import json +from datetime import datetime +from pathlib import Path + +def check_dashboard_status(): + """Check if dashboard is running and get basic info.""" + try: + response = requests.get("http://127.0.0.1:8050", timeout=5) + return response.status_code == 200 + except: + return False + +def get_stream_status_from_dashboard(): + """Get stream status from the running dashboard via HTTP.""" + try: + # Try to get status from dashboard API endpoint + response = requests.get("http://127.0.0.1:8050/stream-status", timeout=5) + if response.status_code == 200: + return response.json() + except: + pass + + # Fallback: check if dashboard is running and provide guidance + if check_dashboard_status(): + return { + "dashboard_running": True, + "message": "Dashboard is running. Check dashboard console for data stream output.", + "note": "Data stream is active within the dashboard process." + } + else: + return { + "dashboard_running": False, + "message": "Dashboard not running. Start with: python run_clean_dashboard.py" + } + +def check_stream(): + """Check current stream status from running dashboard.""" + print("=" * 60) + print("DATA STREAM STATUS CHECK") + print("=" * 60) + + status = get_stream_status_from_dashboard() + + if status.get("dashboard_running"): + print("✅ Dashboard is running") + if "message" in status: + print(f"💡 {status['message']}") + if "note" in status: + print(f"📝 {status['note']}") + + # Show what to look for in dashboard console + print("\n" + "=" * 40) + print("LOOK FOR IN DASHBOARD CONSOLE:") + print("=" * 40) + print("Data stream samples like:") + print(" OHLCV (1m): ETH/USDT | O:4335.67 H:4338.92 L:4334.21 C:4336.67 V:125.8") + print(" TICK: ETH/USDT | Price:4336.67 Vol:0.0456 Side:buy") + print(" DQN Prediction: BUY (conf:0.78)") + print(" Training Exp: Action:1 Reward:0.0234 Done:False") + print("\nIf you don't see these, the system may be waiting for market data.") + else: + print("❌ Dashboard not running") + print(f"💡 {status.get('message', 'Unknown error')}") + +def generate_snapshot(): + """Generate a snapshot from the running dashboard.""" + print("=" * 60) + print("GENERATING DATA SNAPSHOT") + print("=" * 60) + + if not check_dashboard_status(): + print("❌ Dashboard not running") + print("💡 Start dashboard first: python run_clean_dashboard.py") + return + + try: + # Try to trigger snapshot via dashboard API + response = requests.post("http://127.0.0.1:8050/snapshot", timeout=10) + if response.status_code == 200: + result = response.json() + print(f"✅ Snapshot saved: {result.get('filepath', 'Unknown')}") + return + + # Fallback: create empty snapshot with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filepath = f"data_snapshots/snapshot_{timestamp}.json" + + os.makedirs("data_snapshots", exist_ok=True) + + snapshot_data = { + "timestamp": datetime.now().isoformat(), + "dashboard_running": True, + "note": "Empty snapshot - check dashboard console for live data stream", + "data": { + "ohlcv_1m": [], + "ohlcv_5m": [], + "ohlcv_15m": [], + "ticks": [], + "cob_raw": [], + "cob_aggregated": [], + "technical_indicators": [], + "model_states": [], + "predictions": [], + "training_experiences": [] + } + } + + with open(filepath, 'w') as f: + json.dump(snapshot_data, f, indent=2) + + print(f"✅ Snapshot saved: {filepath}") + print("📝 Note: This is an empty snapshot. Check dashboard console for live data.") + + except Exception as e: + print(f"❌ Error: {e}") + +def main(): + if len(sys.argv) < 2: + print("Usage:") + print(" python check_stream.py status # Check stream status") + print(" python check_stream.py snapshot # Generate snapshot") + return + + command = sys.argv[1].lower() + + if command == "status": + check_stream() + elif command == "snapshot": + generate_snapshot() + else: + print(f"Unknown command: {command}") + print("Available commands: status, snapshot") + +if __name__ == "__main__": + main() diff --git a/core/orchestrator.py b/core/orchestrator.py index 671ef80..87f47a2 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -192,6 +192,9 @@ class TradingOrchestrator: self._initialize_cob_integration() self._initialize_decision_fusion() # Initialize fusion system self._initialize_enhanced_training_system() # Initialize real-time training + + # Initialize and start data stream monitor (single source of truth) + self._initialize_data_stream_monitor() def _initialize_ml_models(self): """Initialize ML models for enhanced trading""" @@ -2192,4 +2195,112 @@ class TradingOrchestrator: return None except Exception as e: logger.error(f"Error getting COB RL prediction: {e}") - return None \ No newline at end of file + return None + + def _initialize_data_stream_monitor(self) -> None: + """Initialize the data stream monitor and start streaming immediately. + Managed by orchestrator to avoid external process control. + """ + try: + from data_stream_monitor import get_data_stream_monitor + self.data_stream_monitor = get_data_stream_monitor( + orchestrator=self, + data_provider=self.data_provider, + training_system=getattr(self, 'training_manager', None) + ) + if not getattr(self.data_stream_monitor, 'is_streaming', False): + self.data_stream_monitor.start_streaming() + logger.info("Data stream monitor initialized and started by orchestrator") + except Exception as e: + logger.warning(f"Data stream monitor initialization failed: {e}") + self.data_stream_monitor = None + + def start_data_stream(self) -> bool: + """Start data streaming if not already active.""" + try: + if not getattr(self, 'data_stream_monitor', None): + self._initialize_data_stream_monitor() + if self.data_stream_monitor and not self.data_stream_monitor.is_streaming: + self.data_stream_monitor.start_streaming() + return True + except Exception as e: + logger.error(f"Failed to start data stream: {e}") + return False + + def stop_data_stream(self) -> bool: + """Stop data streaming if active.""" + try: + if getattr(self, 'data_stream_monitor', None) and self.data_stream_monitor.is_streaming: + self.data_stream_monitor.stop_streaming() + return True + except Exception as e: + logger.error(f"Failed to stop data stream: {e}") + return False + + def get_data_stream_status(self) -> Dict[str, any]: + """Return current data stream status and buffer sizes.""" + status = { + 'connected': False, + 'streaming': False, + 'buffers': {} + } + monitor = getattr(self, 'data_stream_monitor', None) + if not monitor: + return status + try: + status['connected'] = monitor.orchestrator is not None and monitor.data_provider is not None + status['streaming'] = bool(monitor.is_streaming) + status['buffers'] = {name: len(buf) for name, buf in monitor.data_streams.items()} + except Exception: + pass + return status + + def save_data_snapshot(self, filepath: str = None) -> str: + """Save a snapshot of current data stream buffers to a file. + + Args: + filepath: Optional path for the snapshot file. If None, generates timestamped name. + + Returns: + Path to the saved snapshot file. + """ + if not getattr(self, 'data_stream_monitor', None): + raise RuntimeError("Data stream monitor not initialized") + + if not filepath: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filepath = f"data_snapshots/snapshot_{timestamp}.json" + + # Ensure directory exists + os.makedirs(os.path.dirname(filepath), exist_ok=True) + + try: + snapshot_data = self.data_stream_monitor.save_snapshot(filepath) + logger.info(f"Data snapshot saved to: {filepath}") + return filepath + except Exception as e: + logger.error(f"Failed to save data snapshot: {e}") + raise + + def get_stream_summary(self) -> Dict[str, any]: + """Get a summary of current data stream activity.""" + status = self.get_data_stream_status() + summary = { + 'status': status, + 'total_samples': sum(status.get('buffers', {}).values()), + 'active_streams': [name for name, count in status.get('buffers', {}).items() if count > 0], + 'last_update': datetime.now().isoformat() + } + + # Add some sample data if available + if getattr(self, 'data_stream_monitor', None): + try: + sample_data = {} + for stream_name, buffer in self.data_stream_monitor.data_streams.items(): + if len(buffer) > 0: + sample_data[stream_name] = buffer[-1] # Latest sample + summary['sample_data'] = sample_data + except Exception: + pass + + return summary \ No newline at end of file diff --git a/data_stream_control.py b/data_stream_control.py deleted file mode 100644 index ff9d43b..0000000 --- a/data_stream_control.py +++ /dev/null @@ -1,156 +0,0 @@ -#!/usr/bin/env python3 -""" -Data Stream Control Script - -Command-line interface to control data streaming for model input capture. -Usage: - python data_stream_control.py start # Start streaming - python data_stream_control.py stop # Stop streaming - python data_stream_control.py snapshot # Save snapshot to file - python data_stream_control.py compact # Switch to compact format - python data_stream_control.py detailed # Switch to detailed format -""" - -import sys -import time -import logging -from pathlib import Path - -# Add project root to path -project_root = Path(__file__).resolve().parent -sys.path.insert(0, str(project_root)) - -from data_stream_monitor import get_data_stream_monitor - -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -def check_dashboard_running(): - """Check if the dashboard is currently running""" - try: - import requests - response = requests.get('http://127.0.0.1:8050', timeout=2) - return response.status_code == 200 - except: - return False - -def main(): - if len(sys.argv) < 2: - print("Usage: python data_stream_control.py ") - print("Commands:") - print(" start - Start data streaming") - print(" stop - Stop data streaming") - print(" snapshot - Save current snapshot to file") - print(" compact - Switch to compact JSON format") - print(" detailed - Switch to detailed human-readable format") - print(" status - Show current streaming status") - return - - command = sys.argv[1].lower() - - # Check if dashboard is running first - if not check_dashboard_running(): - print("❌ Dashboard not running!") - print(" The data stream requires the dashboard to be active.") - print(" Please start the dashboard first:") - print(" python run_clean_dashboard.py") - print() - print(" Then use this control script to manage streaming.") - return - - try: - # Get the monitor instance (will be None if not initialized) - monitor = get_data_stream_monitor() - - if command == 'start': - if monitor.orchestrator is None or monitor.data_provider is None: - print("❌ ERROR: Data stream monitor not properly initialized.") - print(" The data stream requires active orchestrator and data provider.") - print(" Please start the dashboard first:") - print(" python run_clean_dashboard.py") - print() - print(" Then use this control script to manage streaming.") - return - - if not hasattr(monitor, 'is_streaming') or not monitor.is_streaming: - monitor.start_streaming() - print("Data streaming started. Monitor console output for data samples.") - else: - print("Data streaming already active.") - - elif command == 'stop': - if monitor and hasattr(monitor, 'is_streaming') and monitor.is_streaming: - monitor.stop_streaming() - print("Data streaming stopped.") - else: - print("Data streaming not currently active.") - - elif command == 'snapshot': - if monitor is None: - print("ERROR: Data stream monitor not initialized.") - return - - timestamp = time.strftime("%Y%m%d_%H%M%S") - filename = f"data_snapshot_{timestamp}.json" - filepath = project_root / filename - - monitor.save_snapshot(str(filepath)) - print(f"Snapshot saved to: {filepath}") - - elif command == 'compact': - if monitor: - monitor.stream_config['compact_format'] = True - print("Switched to compact JSON format.") - else: - print("ERROR: Data stream monitor not initialized.") - - elif command == 'detailed': - if monitor: - monitor.stream_config['compact_format'] = False - print("Switched to detailed human-readable format.") - else: - print("ERROR: Data stream monitor not initialized.") - - elif command == 'status': - if monitor: - if monitor.orchestrator is None or monitor.data_provider is None: - print("✅ Dashboard is running at http://127.0.0.1:8050") - print("❌ Data Stream Status: NOT CONNECTED") - print(" Orchestrator:", "Missing" if monitor.orchestrator is None else "Connected") - print(" Data Provider:", "Missing" if monitor.data_provider is None else "Connected") - print() - print(" The dashboard is running but the data stream monitor") - print(" is not properly connected to the trading system.") - print() - print(" 💡 SOLUTION: The data stream is actually running inside") - print(" the dashboard process! Check the dashboard console output") - print(" for live data stream samples.") - print() - print(" For detailed status, run:") - print(" python check_data_stream_status.py") - else: - status = "ACTIVE" if monitor.is_streaming else "INACTIVE" - format_type = "compact" if monitor.stream_config.get('compact_format', False) else "detailed" - print(f"✅ Data Stream Status: {status}") - print(f" Orchestrator: Connected") - print(f" Data Provider: Connected") - print(f" Output Format: {format_type}") - print(f" Sampling Rate: {monitor.stream_config.get('sampling_rate', 1.0)} seconds") - - # Show buffer sizes - print("Buffer Status:") - for stream_name, buffer in monitor.data_streams.items(): - print(f" {stream_name}: {len(buffer)} entries") - else: - print("ERROR: Data stream monitor not initialized.") - - else: - print(f"Unknown command: {command}") - - except Exception as e: - logger.error(f"Error executing command '{command}': {e}") - sys.exit(1) - -if __name__ == "__main__": - main() - diff --git a/demo_data_stream.py b/demo_data_stream.py index fafb71e..3e2867f 100644 --- a/demo_data_stream.py +++ b/demo_data_stream.py @@ -51,8 +51,19 @@ def main(): print("• Compact: JSON format for programmatic processing") print() - print("Example console output (Detailed format):") - print("""================================================================================ + print(""" +================================================================================ +DATA STREAM DEMO +================================================================================ + +The data stream is now managed by the TradingOrchestrator and starts +automatically when you run the dashboard: + + python run_clean_dashboard.py + +You should see periodic data samples in the dashboard console. + +================================================================================ DATA STREAM SAMPLE - 14:30:15 ================================================================================ OHLCV (1m): ETH/USDT | O:4335.67 H:4338.92 L:4334.21 C:4336.67 V:125.8 diff --git a/run_clean_dashboard.py b/run_clean_dashboard.py index b967a30..103d80e 100644 --- a/run_clean_dashboard.py +++ b/run_clean_dashboard.py @@ -97,17 +97,13 @@ def run_dashboard_with_recovery(): logger.info("Creating clean dashboard...") dashboard = create_clean_dashboard(data_provider, orchestrator, trading_executor) - # Initialize data stream monitor for model input capture - logger.info("Initializing data stream monitor...") - data_stream_monitor = get_data_stream_monitor( - orchestrator=orchestrator, - data_provider=data_provider, - training_system=getattr(orchestrator, 'training_manager', None) - ) - - # Start data streaming (this will output to console) - logger.info("Starting data stream monitoring...") - data_stream_monitor.start_streaming() + # Initialize data stream monitor for model input capture (managed by orchestrator) + logger.info("Data stream is managed by orchestrator; no separate control needed") + try: + status = orchestrator.get_data_stream_status() + logger.info(f"Data Stream: connected={status.get('connected')} streaming={status.get('streaming')}") + except Exception: + pass logger.info("Dashboard created successfully") logger.info("=== Clean Trading Dashboard Status ===")