157 lines
6.3 KiB
Python
157 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Data Stream Control Script
|
|
|
|
Command-line interface to control data streaming for model input capture.
|
|
Usage:
|
|
python data_stream_control.py start # Start streaming
|
|
python data_stream_control.py stop # Stop streaming
|
|
python data_stream_control.py snapshot # Save snapshot to file
|
|
python data_stream_control.py compact # Switch to compact format
|
|
python data_stream_control.py detailed # Switch to detailed format
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).resolve().parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
from data_stream_monitor import get_data_stream_monitor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
def check_dashboard_running():
|
|
"""Check if the dashboard is currently running"""
|
|
try:
|
|
import requests
|
|
response = requests.get('http://127.0.0.1:8050', timeout=2)
|
|
return response.status_code == 200
|
|
except:
|
|
return False
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python data_stream_control.py <command>")
|
|
print("Commands:")
|
|
print(" start - Start data streaming")
|
|
print(" stop - Stop data streaming")
|
|
print(" snapshot - Save current snapshot to file")
|
|
print(" compact - Switch to compact JSON format")
|
|
print(" detailed - Switch to detailed human-readable format")
|
|
print(" status - Show current streaming status")
|
|
return
|
|
|
|
command = sys.argv[1].lower()
|
|
|
|
# Check if dashboard is running first
|
|
if not check_dashboard_running():
|
|
print("❌ Dashboard not running!")
|
|
print(" The data stream requires the dashboard to be active.")
|
|
print(" Please start the dashboard first:")
|
|
print(" python run_clean_dashboard.py")
|
|
print()
|
|
print(" Then use this control script to manage streaming.")
|
|
return
|
|
|
|
try:
|
|
# Get the monitor instance (will be None if not initialized)
|
|
monitor = get_data_stream_monitor()
|
|
|
|
if command == 'start':
|
|
if monitor.orchestrator is None or monitor.data_provider is None:
|
|
print("❌ ERROR: Data stream monitor not properly initialized.")
|
|
print(" The data stream requires active orchestrator and data provider.")
|
|
print(" Please start the dashboard first:")
|
|
print(" python run_clean_dashboard.py")
|
|
print()
|
|
print(" Then use this control script to manage streaming.")
|
|
return
|
|
|
|
if not hasattr(monitor, 'is_streaming') or not monitor.is_streaming:
|
|
monitor.start_streaming()
|
|
print("Data streaming started. Monitor console output for data samples.")
|
|
else:
|
|
print("Data streaming already active.")
|
|
|
|
elif command == 'stop':
|
|
if monitor and hasattr(monitor, 'is_streaming') and monitor.is_streaming:
|
|
monitor.stop_streaming()
|
|
print("Data streaming stopped.")
|
|
else:
|
|
print("Data streaming not currently active.")
|
|
|
|
elif command == 'snapshot':
|
|
if monitor is None:
|
|
print("ERROR: Data stream monitor not initialized.")
|
|
return
|
|
|
|
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
|
filename = f"data_snapshot_{timestamp}.json"
|
|
filepath = project_root / filename
|
|
|
|
monitor.save_snapshot(str(filepath))
|
|
print(f"Snapshot saved to: {filepath}")
|
|
|
|
elif command == 'compact':
|
|
if monitor:
|
|
monitor.stream_config['compact_format'] = True
|
|
print("Switched to compact JSON format.")
|
|
else:
|
|
print("ERROR: Data stream monitor not initialized.")
|
|
|
|
elif command == 'detailed':
|
|
if monitor:
|
|
monitor.stream_config['compact_format'] = False
|
|
print("Switched to detailed human-readable format.")
|
|
else:
|
|
print("ERROR: Data stream monitor not initialized.")
|
|
|
|
elif command == 'status':
|
|
if monitor:
|
|
if monitor.orchestrator is None or monitor.data_provider is None:
|
|
print("✅ Dashboard is running at http://127.0.0.1:8050")
|
|
print("❌ Data Stream Status: NOT CONNECTED")
|
|
print(" Orchestrator:", "Missing" if monitor.orchestrator is None else "Connected")
|
|
print(" Data Provider:", "Missing" if monitor.data_provider is None else "Connected")
|
|
print()
|
|
print(" The dashboard is running but the data stream monitor")
|
|
print(" is not properly connected to the trading system.")
|
|
print()
|
|
print(" 💡 SOLUTION: The data stream is actually running inside")
|
|
print(" the dashboard process! Check the dashboard console output")
|
|
print(" for live data stream samples.")
|
|
print()
|
|
print(" For detailed status, run:")
|
|
print(" python check_data_stream_status.py")
|
|
else:
|
|
status = "ACTIVE" if monitor.is_streaming else "INACTIVE"
|
|
format_type = "compact" if monitor.stream_config.get('compact_format', False) else "detailed"
|
|
print(f"✅ Data Stream Status: {status}")
|
|
print(f" Orchestrator: Connected")
|
|
print(f" Data Provider: Connected")
|
|
print(f" Output Format: {format_type}")
|
|
print(f" Sampling Rate: {monitor.stream_config.get('sampling_rate', 1.0)} seconds")
|
|
|
|
# Show buffer sizes
|
|
print("Buffer Status:")
|
|
for stream_name, buffer in monitor.data_streams.items():
|
|
print(f" {stream_name}: {len(buffer)} entries")
|
|
else:
|
|
print("ERROR: Data stream monitor not initialized.")
|
|
|
|
else:
|
|
print(f"Unknown command: {command}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing command '{command}': {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|