data stream working

This commit is contained in:
Dobromir Popov
2025-09-02 17:59:12 +03:00
parent 8068e554f3
commit c55175c44d
8 changed files with 1000 additions and 132 deletions

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""
Data Stream Checker - Connects to Running Dashboard
Checks stream status and generates snapshots from the running dashboard.
Data Stream Checker - Consumes Dashboard API
Checks stream status, gets OHLCV data, COB data, and generates snapshots via API.
"""
import sys
@@ -14,131 +14,223 @@ from pathlib import Path
def check_dashboard_status():
"""Check if dashboard is running and get basic info."""
try:
response = requests.get("http://127.0.0.1:8050", timeout=5)
return response.status_code == 200
response = requests.get("http://127.0.0.1:8050/api/health", timeout=5)
return response.status_code == 200, response.json()
except:
return False
return False, {}
def get_stream_status_from_dashboard():
"""Get stream status from the running dashboard via HTTP."""
def get_stream_status_from_api():
"""Get stream status from the dashboard API."""
try:
# Try to get status from dashboard API endpoint
response = requests.get("http://127.0.0.1:8050/stream-status", timeout=5)
response = requests.get("http://127.0.0.1:8050/api/stream-status", timeout=10)
if response.status_code == 200:
return response.json()
except:
pass
# Fallback: check if dashboard is running and provide guidance
if check_dashboard_status():
return {
"dashboard_running": True,
"message": "Dashboard is running. Check dashboard console for data stream output.",
"note": "Data stream is active within the dashboard process."
}
else:
return {
"dashboard_running": False,
"message": "Dashboard not running. Start with: python run_clean_dashboard.py"
}
except Exception as e:
print(f"Error getting stream status: {e}")
return None
def get_ohlcv_data_from_api(symbol='ETH/USDT', timeframe='1m', limit=300):
"""Get OHLCV data with indicators from the dashboard API."""
try:
url = f"http://127.0.0.1:8050/api/ohlcv-data"
params = {'symbol': symbol, 'timeframe': timeframe, 'limit': limit}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error getting OHLCV data: {e}")
return None
def get_cob_data_from_api(symbol='ETH/USDT', limit=300):
"""Get COB data with price buckets from the dashboard API."""
try:
url = f"http://127.0.0.1:8050/api/cob-data"
params = {'symbol': symbol, 'limit': limit}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error getting COB data: {e}")
return None
def create_snapshot_via_api():
"""Create a snapshot via the dashboard API."""
try:
response = requests.post("http://127.0.0.1:8050/api/snapshot", timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error creating snapshot: {e}")
return None
def check_stream():
"""Check current stream status from running dashboard."""
"""Check current stream status from dashboard API."""
print("=" * 60)
print("DATA STREAM STATUS CHECK")
print("=" * 60)
status = get_stream_status_from_dashboard()
if status.get("dashboard_running"):
print("✅ Dashboard is running")
if "message" in status:
print(f"💡 {status['message']}")
if "note" in status:
print(f"📝 {status['note']}")
# Show what to look for in dashboard console
print("\n" + "=" * 40)
print("LOOK FOR IN DASHBOARD CONSOLE:")
print("=" * 40)
print("Data stream samples like:")
print(" OHLCV (1m): ETH/USDT | O:4335.67 H:4338.92 L:4334.21 C:4336.67 V:125.8")
print(" TICK: ETH/USDT | Price:4336.67 Vol:0.0456 Side:buy")
print(" DQN Prediction: BUY (conf:0.78)")
print(" Training Exp: Action:1 Reward:0.0234 Done:False")
print("\nIf you don't see these, the system may be waiting for market data.")
else:
print("❌ Dashboard not running")
print(f"💡 {status.get('message', 'Unknown error')}")
def generate_snapshot():
"""Generate a snapshot from the running dashboard."""
print("=" * 60)
print("GENERATING DATA SNAPSHOT")
print("=" * 60)
if not check_dashboard_status():
# Check dashboard health
dashboard_running, health_data = check_dashboard_status()
if not dashboard_running:
print("❌ Dashboard not running")
print("💡 Start dashboard first: python run_clean_dashboard.py")
return
try:
# Try to trigger snapshot via dashboard API
response = requests.post("http://127.0.0.1:8050/snapshot", timeout=10)
if response.status_code == 200:
result = response.json()
print(f"✅ Snapshot saved: {result.get('filepath', 'Unknown')}")
return
print("✅ Dashboard is running")
print(f"📊 Health: {health_data.get('status', 'unknown')}")
# Get stream status
stream_data = get_stream_status_from_api()
if stream_data:
status = stream_data.get('status', {})
summary = stream_data.get('summary', {})
# Fallback: create empty snapshot with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = f"data_snapshots/snapshot_{timestamp}.json"
print(f"\n🔄 Stream Status:")
print(f" Connected: {status.get('connected', False)}")
print(f" Streaming: {status.get('streaming', False)}")
print(f" Total Samples: {summary.get('total_samples', 0)}")
print(f" Active Streams: {len(summary.get('active_streams', []))}")
os.makedirs("data_snapshots", exist_ok=True)
if summary.get('active_streams'):
print(f" Active: {', '.join(summary['active_streams'])}")
snapshot_data = {
"timestamp": datetime.now().isoformat(),
"dashboard_running": True,
"note": "Empty snapshot - check dashboard console for live data stream",
"data": {
"ohlcv_1m": [],
"ohlcv_5m": [],
"ohlcv_15m": [],
"ticks": [],
"cob_raw": [],
"cob_aggregated": [],
"technical_indicators": [],
"model_states": [],
"predictions": [],
"training_experiences": []
}
}
print(f"\n📈 Buffer Sizes:")
buffers = status.get('buffers', {})
for stream, count in buffers.items():
status_icon = "🟢" if count > 0 else "🔴"
print(f" {status_icon} {stream}: {count}")
with open(filepath, 'w') as f:
json.dump(snapshot_data, f, indent=2)
if summary.get('sample_data'):
print(f"\n📝 Latest Samples:")
for stream, sample in summary['sample_data'].items():
print(f" {stream}: {str(sample)[:100]}...")
else:
print("❌ Could not get stream status from API")
def show_ohlcv_data():
"""Show OHLCV data with indicators."""
print("=" * 60)
print("OHLCV DATA WITH INDICATORS")
print("=" * 60)
# Check dashboard health
dashboard_running, _ = check_dashboard_status()
if not dashboard_running:
print("❌ Dashboard not running")
print("💡 Start dashboard first: python run_clean_dashboard.py")
return
# Get OHLCV data for different timeframes
timeframes = ['1s', '1m', '1h', '1d']
symbol = 'ETH/USDT'
for timeframe in timeframes:
print(f"\n📊 {symbol} {timeframe} Data:")
data = get_ohlcv_data_from_api(symbol, timeframe, 300)
print(f"✅ Snapshot saved: {filepath}")
print("📝 Note: This is an empty snapshot. Check dashboard console for live data.")
if data and data.get('data'):
ohlcv_data = data['data']
print(f" Records: {len(ohlcv_data)}")
if ohlcv_data:
latest = ohlcv_data[-1]
print(f" Latest: {latest['timestamp']}")
print(f" Price: ${latest['close']:.2f}")
indicators = latest.get('indicators', {})
if indicators:
print(f" RSI: {indicators.get('rsi', 'N/A')}")
print(f" MACD: {indicators.get('macd', 'N/A')}")
print(f" SMA20: {indicators.get('sma_20', 'N/A')}")
else:
print(f" No data available")
def show_cob_data():
"""Show COB data with price buckets."""
print("=" * 60)
print("COB DATA WITH PRICE BUCKETS")
print("=" * 60)
# Check dashboard health
dashboard_running, _ = check_dashboard_status()
if not dashboard_running:
print("❌ Dashboard not running")
print("💡 Start dashboard first: python run_clean_dashboard.py")
return
symbol = 'ETH/USDT'
print(f"\n📊 {symbol} COB Data:")
data = get_cob_data_from_api(symbol, 300)
if data and data.get('data'):
cob_data = data['data']
print(f" Records: {len(cob_data)}")
except Exception as e:
print(f"❌ Error: {e}")
if cob_data:
latest = cob_data[-1]
print(f" Latest: {latest['timestamp']}")
print(f" Mid Price: ${latest['mid_price']:.2f}")
print(f" Spread: {latest['spread']:.4f}")
print(f" Imbalance: {latest['imbalance']:.4f}")
price_buckets = latest.get('price_buckets', {})
if price_buckets:
print(f" Price Buckets: {len(price_buckets)} ($1 increments)")
# Show some sample buckets
bucket_count = 0
for price, bucket in price_buckets.items():
if bucket['bid_volume'] > 0 or bucket['ask_volume'] > 0:
print(f" ${price}: Bid={bucket['bid_volume']:.2f} Ask={bucket['ask_volume']:.2f}")
bucket_count += 1
if bucket_count >= 5: # Show first 5 active buckets
break
else:
print(f" No COB data available")
def generate_snapshot():
"""Generate a snapshot via API."""
print("=" * 60)
print("GENERATING DATA SNAPSHOT")
print("=" * 60)
# Check dashboard health
dashboard_running, _ = check_dashboard_status()
if not dashboard_running:
print("❌ Dashboard not running")
print("💡 Start dashboard first: python run_clean_dashboard.py")
return
# Create snapshot via API
result = create_snapshot_via_api()
if result:
print(f"✅ Snapshot saved: {result.get('filepath', 'Unknown')}")
print(f"📅 Timestamp: {result.get('timestamp', 'Unknown')}")
else:
print("❌ Failed to create snapshot via API")
def main():
if len(sys.argv) < 2:
print("Usage:")
print(" python check_stream.py status # Check stream status")
print(" python check_stream.py snapshot # Generate snapshot")
print(" python check_stream.py status # Check stream status")
print(" python check_stream.py ohlcv # Show OHLCV data")
print(" python check_stream.py cob # Show COB data")
print(" python check_stream.py snapshot # Generate snapshot")
return
command = sys.argv[1].lower()
if command == "status":
check_stream()
elif command == "ohlcv":
show_ohlcv_data()
elif command == "cob":
show_cob_data()
elif command == "snapshot":
generate_snapshot()
else:
print(f"Unknown command: {command}")
print("Available commands: status, snapshot")
print("Available commands: status, ohlcv, cob, snapshot")
if __name__ == "__main__":
main()