From c55175c44d103c8f89e134f07dcb809242642288 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 2 Sep 2025 17:59:12 +0300 Subject: [PATCH] data stream working --- .vscode/tasks.json | 38 +++++- DATA_STREAM_GUIDE.md | 101 +++++++++----- check_stream.py | 276 +++++++++++++++++++++++++------------- core/orchestrator.py | 37 ++++- debug_dashboard.py | 56 ++++++++ kill_dashboard.py | 207 ++++++++++++++++++++++++++++ run_clean_dashboard.py | 120 +++++++++++++++++ web/clean_dashboard.py | 297 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 1000 insertions(+), 132 deletions(-) create mode 100644 debug_dashboard.py create mode 100644 kill_dashboard.py diff --git a/.vscode/tasks.json b/.vscode/tasks.json index ffcf7ef..5559264 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -4,15 +4,14 @@ { "label": "Kill Stale Processes", "type": "shell", - "command": "powershell", + "command": "python", "args": [ - "-Command", - "Get-Process python | Where-Object {$_.ProcessName -eq 'python' -and $_.MainWindowTitle -like '*dashboard*'} | Stop-Process -Force; Start-Sleep -Seconds 1" + "kill_dashboard.py" ], "group": "build", "presentation": { "echo": true, - "reveal": "silent", + "reveal": "always", "focus": false, "panel": "shared", "showReuseMessage": false, @@ -106,6 +105,37 @@ "panel": "shared" }, "problemMatcher": [] + }, + { + "label": "Debug Dashboard", + "type": "shell", + "command": "python", + "args": [ + "debug_dashboard.py" + ], + "group": "build", + "isBackground": true, + "presentation": { + "echo": true, + "reveal": "always", + "focus": false, + "panel": "new", + "showReuseMessage": false, + "clear": false + }, + "problemMatcher": { + "pattern": { + "regexp": "^.*$", + "file": 1, + "location": 2, + "message": 3 + }, + "background": { + "activeOnStart": true, + "beginsPattern": ".*Starting dashboard.*", + "endsPattern": ".*Dashboard.*ready.*" + } + } } ] } \ No newline at end of file diff --git a/DATA_STREAM_GUIDE.md b/DATA_STREAM_GUIDE.md index d7edba8..ee55d4b 100644 --- a/DATA_STREAM_GUIDE.md +++ b/DATA_STREAM_GUIDE.md @@ -7,6 +7,16 @@ python check_stream.py status ``` +### Show OHLCV Data with Indicators +```bash +python check_stream.py ohlcv +``` + +### Show COB Data with Price Buckets +```bash +python check_stream.py cob +``` + ### Generate Snapshot ```bash python check_stream.py snapshot @@ -16,58 +26,79 @@ python check_stream.py snapshot ### Stream Status Output - āœ… Dashboard is running -- šŸ’” Guidance message -- šŸ“ Data stream location note -- Console output examples to look for +- šŸ“Š Health status +- šŸ”„ Stream connection and streaming status +- šŸ“ˆ Total samples and active streams +- 🟢/šŸ”“ Buffer sizes for each data type + +### OHLCV Data Output +- šŸ“Š Data for 1s, 1m, 1h, 1d timeframes +- Records count and latest timestamp +- Current price and technical indicators: + - RSI (Relative Strength Index) + - MACD (Moving Average Convergence Divergence) + - SMA20 (Simple Moving Average 20-period) + +### COB Data Output +- šŸ“Š Order book data with price buckets +- Mid price, spread, and imbalance +- Price buckets in $1 increments +- Bid/ask volumes for each bucket ### Snapshot Output -- āœ… Snapshot saved: `data_snapshots/snapshot_YYYYMMDD_HHMMSS.json` -- šŸ“ Note about data location +- āœ… Snapshot saved with filepath +- šŸ“… Timestamp of creation -## How It Works +## API Endpoints -The script connects to your **running dashboard** instead of creating a new instance: +The dashboard exposes these REST API endpoints: -1. **Checks if dashboard is running** at `http://127.0.0.1:8050` -2. **Provides guidance** on where to find the data stream -3. **Generates snapshots** with current timestamp and metadata +- `GET /api/health` - Health check +- `GET /api/stream-status` - Data stream status +- `GET /api/ohlcv-data?symbol=ETH/USDT&timeframe=1m&limit=300` - OHLCV data with indicators +- `GET /api/cob-data?symbol=ETH/USDT&limit=300` - COB data with price buckets +- `POST /api/snapshot` - Generate data snapshot -## Where to Find Live Data +## Data Available -The **data stream is active inside the dashboard console**. Look for output like: +### OHLCV Data (300 points each) +- **1s**: Real-time tick data +- **1m**: 1-minute candlesticks +- **1h**: 1-hour candlesticks +- **1d**: Daily candlesticks -``` -OHLCV (1m): ETH/USDT | O:4335.67 H:4338.92 L:4334.21 C:4336.67 V:125.8 -TICK: ETH/USDT | Price:4336.67 Vol:0.0456 Side:buy -DQN Prediction: BUY (conf:0.78) -Training Exp: Action:1 Reward:0.0234 Done:False -``` +### Technical Indicators +- SMA (Simple Moving Average) 20, 50 +- EMA (Exponential Moving Average) 12, 26 +- RSI (Relative Strength Index) +- MACD (Moving Average Convergence Divergence) +- Bollinger Bands (Upper, Middle, Lower) +- Volume ratio + +### COB Data (300 points) +- **Price buckets**: $1 increments around mid price +- **Order book levels**: Bid/ask volumes and counts +- **Market microstructure**: Spread, imbalance, total volumes ## When Data Appears -Data will show in the dashboard console when: -1. **Market data is flowing** (OHLCV, ticks, COB) -2. **Models are making predictions** -3. **Training is active** - -## Snapshot Contents - -Snapshots contain: -- Timestamp -- Dashboard status -- Empty data arrays (data is in dashboard console) -- Note about checking console for live data +Data will be available when: +1. **Dashboard is running** (`python run_clean_dashboard.py`) +2. **Market data is flowing** (OHLCV, ticks, COB) +3. **Models are making predictions** +4. **Training is active** ## Usage Tips - **Start dashboard first**: `python run_clean_dashboard.py` -- **Check status** to confirm dashboard is running -- **Watch dashboard console** for live data stream -- **Generate snapshots** to capture timestamps and metadata +- **Check status** to confirm data is flowing +- **Use OHLCV command** to see price data with indicators +- **Use COB command** to see order book microstructure +- **Generate snapshots** to capture current state - **Wait for market activity** to see data populate ## Files Created -- `check_stream.py` - Status and snapshot commands +- `check_stream.py` - API client for data access - `data_snapshots/` - Directory for saved snapshots -- `snapshot_*.json` - Timestamped snapshot files with metadata +- `snapshot_*.json` - Timestamped snapshot files with full data diff --git a/check_stream.py b/check_stream.py index 226114d..f2cc2af 100644 --- a/check_stream.py +++ b/check_stream.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -Data Stream Checker - Connects to Running Dashboard -Checks stream status and generates snapshots from the running dashboard. +Data Stream Checker - Consumes Dashboard API +Checks stream status, gets OHLCV data, COB data, and generates snapshots via API. """ import sys @@ -14,131 +14,223 @@ from pathlib import Path def check_dashboard_status(): """Check if dashboard is running and get basic info.""" try: - response = requests.get("http://127.0.0.1:8050", timeout=5) - return response.status_code == 200 + response = requests.get("http://127.0.0.1:8050/api/health", timeout=5) + return response.status_code == 200, response.json() except: - return False + return False, {} -def get_stream_status_from_dashboard(): - """Get stream status from the running dashboard via HTTP.""" +def get_stream_status_from_api(): + """Get stream status from the dashboard API.""" try: - # Try to get status from dashboard API endpoint - response = requests.get("http://127.0.0.1:8050/stream-status", timeout=5) + response = requests.get("http://127.0.0.1:8050/api/stream-status", timeout=10) if response.status_code == 200: return response.json() - except: - pass - - # Fallback: check if dashboard is running and provide guidance - if check_dashboard_status(): - return { - "dashboard_running": True, - "message": "Dashboard is running. Check dashboard console for data stream output.", - "note": "Data stream is active within the dashboard process." - } - else: - return { - "dashboard_running": False, - "message": "Dashboard not running. Start with: python run_clean_dashboard.py" - } + except Exception as e: + print(f"Error getting stream status: {e}") + return None + +def get_ohlcv_data_from_api(symbol='ETH/USDT', timeframe='1m', limit=300): + """Get OHLCV data with indicators from the dashboard API.""" + try: + url = f"http://127.0.0.1:8050/api/ohlcv-data" + params = {'symbol': symbol, 'timeframe': timeframe, 'limit': limit} + response = requests.get(url, params=params, timeout=10) + if response.status_code == 200: + return response.json() + except Exception as e: + print(f"Error getting OHLCV data: {e}") + return None + +def get_cob_data_from_api(symbol='ETH/USDT', limit=300): + """Get COB data with price buckets from the dashboard API.""" + try: + url = f"http://127.0.0.1:8050/api/cob-data" + params = {'symbol': symbol, 'limit': limit} + response = requests.get(url, params=params, timeout=10) + if response.status_code == 200: + return response.json() + except Exception as e: + print(f"Error getting COB data: {e}") + return None + +def create_snapshot_via_api(): + """Create a snapshot via the dashboard API.""" + try: + response = requests.post("http://127.0.0.1:8050/api/snapshot", timeout=10) + if response.status_code == 200: + return response.json() + except Exception as e: + print(f"Error creating snapshot: {e}") + return None def check_stream(): - """Check current stream status from running dashboard.""" + """Check current stream status from dashboard API.""" print("=" * 60) print("DATA STREAM STATUS CHECK") print("=" * 60) - status = get_stream_status_from_dashboard() - - if status.get("dashboard_running"): - print("āœ… Dashboard is running") - if "message" in status: - print(f"šŸ’” {status['message']}") - if "note" in status: - print(f"šŸ“ {status['note']}") - - # Show what to look for in dashboard console - print("\n" + "=" * 40) - print("LOOK FOR IN DASHBOARD CONSOLE:") - print("=" * 40) - print("Data stream samples like:") - print(" OHLCV (1m): ETH/USDT | O:4335.67 H:4338.92 L:4334.21 C:4336.67 V:125.8") - print(" TICK: ETH/USDT | Price:4336.67 Vol:0.0456 Side:buy") - print(" DQN Prediction: BUY (conf:0.78)") - print(" Training Exp: Action:1 Reward:0.0234 Done:False") - print("\nIf you don't see these, the system may be waiting for market data.") - else: - print("āŒ Dashboard not running") - print(f"šŸ’” {status.get('message', 'Unknown error')}") - -def generate_snapshot(): - """Generate a snapshot from the running dashboard.""" - print("=" * 60) - print("GENERATING DATA SNAPSHOT") - print("=" * 60) - - if not check_dashboard_status(): + # Check dashboard health + dashboard_running, health_data = check_dashboard_status() + if not dashboard_running: print("āŒ Dashboard not running") print("šŸ’” Start dashboard first: python run_clean_dashboard.py") return - try: - # Try to trigger snapshot via dashboard API - response = requests.post("http://127.0.0.1:8050/snapshot", timeout=10) - if response.status_code == 200: - result = response.json() - print(f"āœ… Snapshot saved: {result.get('filepath', 'Unknown')}") - return + print("āœ… Dashboard is running") + print(f"šŸ“Š Health: {health_data.get('status', 'unknown')}") + + # Get stream status + stream_data = get_stream_status_from_api() + if stream_data: + status = stream_data.get('status', {}) + summary = stream_data.get('summary', {}) - # Fallback: create empty snapshot with timestamp - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - filepath = f"data_snapshots/snapshot_{timestamp}.json" + print(f"\nšŸ”„ Stream Status:") + print(f" Connected: {status.get('connected', False)}") + print(f" Streaming: {status.get('streaming', False)}") + print(f" Total Samples: {summary.get('total_samples', 0)}") + print(f" Active Streams: {len(summary.get('active_streams', []))}") - os.makedirs("data_snapshots", exist_ok=True) + if summary.get('active_streams'): + print(f" Active: {', '.join(summary['active_streams'])}") - snapshot_data = { - "timestamp": datetime.now().isoformat(), - "dashboard_running": True, - "note": "Empty snapshot - check dashboard console for live data stream", - "data": { - "ohlcv_1m": [], - "ohlcv_5m": [], - "ohlcv_15m": [], - "ticks": [], - "cob_raw": [], - "cob_aggregated": [], - "technical_indicators": [], - "model_states": [], - "predictions": [], - "training_experiences": [] - } - } + print(f"\nšŸ“ˆ Buffer Sizes:") + buffers = status.get('buffers', {}) + for stream, count in buffers.items(): + status_icon = "🟢" if count > 0 else "šŸ”“" + print(f" {status_icon} {stream}: {count}") - with open(filepath, 'w') as f: - json.dump(snapshot_data, f, indent=2) + if summary.get('sample_data'): + print(f"\nšŸ“ Latest Samples:") + for stream, sample in summary['sample_data'].items(): + print(f" {stream}: {str(sample)[:100]}...") + else: + print("āŒ Could not get stream status from API") + +def show_ohlcv_data(): + """Show OHLCV data with indicators.""" + print("=" * 60) + print("OHLCV DATA WITH INDICATORS") + print("=" * 60) + + # Check dashboard health + dashboard_running, _ = check_dashboard_status() + if not dashboard_running: + print("āŒ Dashboard not running") + print("šŸ’” Start dashboard first: python run_clean_dashboard.py") + return + + # Get OHLCV data for different timeframes + timeframes = ['1s', '1m', '1h', '1d'] + symbol = 'ETH/USDT' + + for timeframe in timeframes: + print(f"\nšŸ“Š {symbol} {timeframe} Data:") + data = get_ohlcv_data_from_api(symbol, timeframe, 300) - print(f"āœ… Snapshot saved: {filepath}") - print("šŸ“ Note: This is an empty snapshot. Check dashboard console for live data.") + if data and data.get('data'): + ohlcv_data = data['data'] + print(f" Records: {len(ohlcv_data)}") + + if ohlcv_data: + latest = ohlcv_data[-1] + print(f" Latest: {latest['timestamp']}") + print(f" Price: ${latest['close']:.2f}") + + indicators = latest.get('indicators', {}) + if indicators: + print(f" RSI: {indicators.get('rsi', 'N/A')}") + print(f" MACD: {indicators.get('macd', 'N/A')}") + print(f" SMA20: {indicators.get('sma_20', 'N/A')}") + else: + print(f" No data available") + +def show_cob_data(): + """Show COB data with price buckets.""" + print("=" * 60) + print("COB DATA WITH PRICE BUCKETS") + print("=" * 60) + + # Check dashboard health + dashboard_running, _ = check_dashboard_status() + if not dashboard_running: + print("āŒ Dashboard not running") + print("šŸ’” Start dashboard first: python run_clean_dashboard.py") + return + + symbol = 'ETH/USDT' + print(f"\nšŸ“Š {symbol} COB Data:") + + data = get_cob_data_from_api(symbol, 300) + if data and data.get('data'): + cob_data = data['data'] + print(f" Records: {len(cob_data)}") - except Exception as e: - print(f"āŒ Error: {e}") + if cob_data: + latest = cob_data[-1] + print(f" Latest: {latest['timestamp']}") + print(f" Mid Price: ${latest['mid_price']:.2f}") + print(f" Spread: {latest['spread']:.4f}") + print(f" Imbalance: {latest['imbalance']:.4f}") + + price_buckets = latest.get('price_buckets', {}) + if price_buckets: + print(f" Price Buckets: {len(price_buckets)} ($1 increments)") + + # Show some sample buckets + bucket_count = 0 + for price, bucket in price_buckets.items(): + if bucket['bid_volume'] > 0 or bucket['ask_volume'] > 0: + print(f" ${price}: Bid={bucket['bid_volume']:.2f} Ask={bucket['ask_volume']:.2f}") + bucket_count += 1 + if bucket_count >= 5: # Show first 5 active buckets + break + else: + print(f" No COB data available") + +def generate_snapshot(): + """Generate a snapshot via API.""" + print("=" * 60) + print("GENERATING DATA SNAPSHOT") + print("=" * 60) + + # Check dashboard health + dashboard_running, _ = check_dashboard_status() + if not dashboard_running: + print("āŒ Dashboard not running") + print("šŸ’” Start dashboard first: python run_clean_dashboard.py") + return + + # Create snapshot via API + result = create_snapshot_via_api() + if result: + print(f"āœ… Snapshot saved: {result.get('filepath', 'Unknown')}") + print(f"šŸ“… Timestamp: {result.get('timestamp', 'Unknown')}") + else: + print("āŒ Failed to create snapshot via API") def main(): if len(sys.argv) < 2: print("Usage:") - print(" python check_stream.py status # Check stream status") - print(" python check_stream.py snapshot # Generate snapshot") + print(" python check_stream.py status # Check stream status") + print(" python check_stream.py ohlcv # Show OHLCV data") + print(" python check_stream.py cob # Show COB data") + print(" python check_stream.py snapshot # Generate snapshot") return command = sys.argv[1].lower() if command == "status": check_stream() + elif command == "ohlcv": + show_ohlcv_data() + elif command == "cob": + show_cob_data() elif command == "snapshot": generate_snapshot() else: print(f"Unknown command: {command}") - print("Available commands: status, snapshot") + print("Available commands: status, ohlcv, cob, snapshot") if __name__ == "__main__": main() diff --git a/core/orchestrator.py b/core/orchestrator.py index 87f47a2..27e6eaf 100644 --- a/core/orchestrator.py +++ b/core/orchestrator.py @@ -2303,4 +2303,39 @@ class TradingOrchestrator: except Exception: pass - return summary \ No newline at end of file + return summary + + def get_cob_data(self, symbol: str, limit: int = 300) -> List: + """Get COB data for a symbol with specified limit.""" + try: + if hasattr(self, 'cob_integration') and self.cob_integration: + return self.cob_integration.get_cob_history(symbol, limit) + return [] + except Exception as e: + logger.error(f"Error getting COB data: {e}") + return [] + + def get_ohlcv_data(self, symbol: str, timeframe: str, limit: int = 300) -> List: + """Get OHLCV data for a symbol with specified timeframe and limit.""" + try: + ohlcv_df = self.data_provider.get_ohlcv(symbol, timeframe, limit=limit) + if ohlcv_df is None or ohlcv_df.empty: + return [] + + # Convert to list of dictionaries + result = [] + for _, row in ohlcv_df.iterrows(): + data_point = { + 'timestamp': row.name.isoformat() if hasattr(row.name, 'isoformat') else str(row.name), + 'open': float(row['open']), + 'high': float(row['high']), + 'low': float(row['low']), + 'close': float(row['close']), + 'volume': float(row['volume']) + } + result.append(data_point) + + return result + except Exception as e: + logger.error(f"Error getting OHLCV data: {e}") + return [] \ No newline at end of file diff --git a/debug_dashboard.py b/debug_dashboard.py new file mode 100644 index 0000000..e56bafd --- /dev/null +++ b/debug_dashboard.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +""" +Cross-Platform Debug Dashboard Script +Kills existing processes and starts the dashboard for debugging on both Linux and Windows. +""" + +import subprocess +import sys +import time +import logging +import platform + +# Setup logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def main(): + logger.info("=== Cross-Platform Debug Dashboard Startup ===") + logger.info(f"Platform: {platform.system()} {platform.release()}") + + # Step 1: Kill existing processes + logger.info("Step 1: Cleaning up existing processes...") + try: + result = subprocess.run([sys.executable, 'kill_dashboard.py'], + capture_output=True, text=True, timeout=30) + if result.returncode == 0: + logger.info("āœ… Process cleanup completed") + else: + logger.warning("āš ļø Process cleanup had issues") + except subprocess.TimeoutExpired: + logger.warning("āš ļø Process cleanup timed out") + except Exception as e: + logger.error(f"āŒ Process cleanup failed: {e}") + + # Step 2: Wait a moment + logger.info("Step 2: Waiting for cleanup to settle...") + time.sleep(3) + + # Step 3: Start dashboard + logger.info("Step 3: Starting dashboard...") + try: + logger.info("šŸš€ Starting: python run_clean_dashboard.py") + logger.info("šŸ’” Dashboard will be available at: http://127.0.0.1:8050") + logger.info("šŸ’” API endpoints available at: http://127.0.0.1:8050/api/") + logger.info("šŸ’” Press Ctrl+C to stop") + + # Start the dashboard + subprocess.run([sys.executable, 'run_clean_dashboard.py']) + + except KeyboardInterrupt: + logger.info("šŸ›‘ Dashboard stopped by user") + except Exception as e: + logger.error(f"āŒ Dashboard failed to start: {e}") + +if __name__ == "__main__": + main() diff --git a/kill_dashboard.py b/kill_dashboard.py new file mode 100644 index 0000000..ab1e8e8 --- /dev/null +++ b/kill_dashboard.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +""" +Cross-Platform Dashboard Process Cleanup Script +Works on both Linux and Windows systems. +""" + +import os +import sys +import time +import signal +import subprocess +import logging +import platform + +# Setup logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def is_windows(): + """Check if running on Windows""" + return platform.system().lower() == "windows" + +def kill_processes_windows(): + """Kill dashboard processes on Windows""" + killed_count = 0 + + try: + # Use tasklist to find Python processes + result = subprocess.run(['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0: + lines = result.stdout.split('\n') + for line in lines[1:]: # Skip header + if line.strip() and 'python.exe' in line: + parts = line.split(',') + if len(parts) > 1: + pid = parts[1].strip('"') + try: + # Get command line to check if it's our dashboard + cmd_result = subprocess.run(['wmic', 'process', 'where', f'ProcessId={pid}', 'get', 'CommandLine', '/format:csv'], + capture_output=True, text=True, timeout=5) + if cmd_result.returncode == 0 and ('run_clean_dashboard' in cmd_result.stdout or 'clean_dashboard' in cmd_result.stdout): + logger.info(f"Killing Windows process {pid}") + subprocess.run(['taskkill', '/PID', pid, '/F'], + capture_output=True, timeout=5) + killed_count += 1 + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + except Exception as e: + logger.debug(f"Error checking process {pid}: {e}") + except (subprocess.TimeoutExpired, FileNotFoundError): + logger.debug("tasklist not available") + except Exception as e: + logger.error(f"Error in Windows process cleanup: {e}") + + return killed_count + +def kill_processes_linux(): + """Kill dashboard processes on Linux""" + killed_count = 0 + + # Find and kill processes by name + process_names = [ + 'run_clean_dashboard', + 'clean_dashboard', + 'python.*run_clean_dashboard', + 'python.*clean_dashboard' + ] + + for process_name in process_names: + try: + # Use pgrep to find processes + result = subprocess.run(['pgrep', '-f', process_name], + capture_output=True, text=True, timeout=10) + if result.returncode == 0 and result.stdout.strip(): + pids = result.stdout.strip().split('\n') + for pid in pids: + if pid.strip(): + try: + logger.info(f"Killing Linux process {pid} ({process_name})") + os.kill(int(pid), signal.SIGTERM) + killed_count += 1 + except (ProcessLookupError, ValueError) as e: + logger.debug(f"Process {pid} already terminated: {e}") + except Exception as e: + logger.warning(f"Error killing process {pid}: {e}") + except (subprocess.TimeoutExpired, FileNotFoundError): + logger.debug(f"pgrep not available for {process_name}") + + # Kill processes using port 8050 + try: + result = subprocess.run(['lsof', '-ti', ':8050'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0 and result.stdout.strip(): + pids = result.stdout.strip().split('\n') + logger.info(f"Found processes using port 8050: {pids}") + + for pid in pids: + if pid.strip(): + try: + logger.info(f"Killing process {pid} using port 8050") + os.kill(int(pid), signal.SIGTERM) + killed_count += 1 + except (ProcessLookupError, ValueError) as e: + logger.debug(f"Process {pid} already terminated: {e}") + except Exception as e: + logger.warning(f"Error killing process {pid}: {e}") + except (subprocess.TimeoutExpired, FileNotFoundError): + logger.debug("lsof not available") + + return killed_count + +def check_port_8050(): + """Check if port 8050 is free (cross-platform)""" + import socket + + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', 8050)) + return True + except OSError: + return False + +def kill_dashboard_processes(): + """Kill all dashboard-related processes (cross-platform)""" + logger.info("Killing dashboard processes...") + + if is_windows(): + logger.info("Detected Windows system") + killed_count = kill_processes_windows() + else: + logger.info("Detected Linux/Unix system") + killed_count = kill_processes_linux() + + # Wait for processes to terminate + if killed_count > 0: + logger.info(f"Killed {killed_count} processes, waiting for termination...") + time.sleep(3) + + # Force kill any remaining processes + if is_windows(): + # Windows force kill + try: + result = subprocess.run(['tasklist', '/FI', 'IMAGENAME eq python.exe', '/FO', 'CSV'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0: + lines = result.stdout.split('\n') + for line in lines[1:]: + if line.strip() and 'python.exe' in line: + parts = line.split(',') + if len(parts) > 1: + pid = parts[1].strip('"') + try: + cmd_result = subprocess.run(['wmic', 'process', 'where', f'ProcessId={pid}', 'get', 'CommandLine', '/format:csv'], + capture_output=True, text=True, timeout=3) + if cmd_result.returncode == 0 and ('run_clean_dashboard' in cmd_result.stdout or 'clean_dashboard' in cmd_result.stdout): + logger.info(f"Force killing Windows process {pid}") + subprocess.run(['taskkill', '/PID', pid, '/F'], + capture_output=True, timeout=3) + except: + pass + except: + pass + else: + # Linux force kill + for process_name in ['run_clean_dashboard', 'clean_dashboard']: + try: + result = subprocess.run(['pgrep', '-f', process_name], + capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + pids = result.stdout.strip().split('\n') + for pid in pids: + if pid.strip(): + try: + logger.info(f"Force killing Linux process {pid}") + os.kill(int(pid), signal.SIGKILL) + except (ProcessLookupError, ValueError): + pass + except Exception as e: + logger.warning(f"Error force killing process {pid}: {e}") + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + return killed_count + +def main(): + logger.info("=== Cross-Platform Dashboard Process Cleanup ===") + logger.info(f"Platform: {platform.system()} {platform.release()}") + + # Kill processes + killed = kill_dashboard_processes() + + # Check port status + port_free = check_port_8050() + + logger.info("=== Cleanup Summary ===") + logger.info(f"Processes killed: {killed}") + logger.info(f"Port 8050 free: {port_free}") + + if port_free: + logger.info("āœ… Ready for debugging - port 8050 is available") + else: + logger.warning("āš ļø Port 8050 may still be in use") + logger.info("šŸ’” Try running this script again or restart your system") + +if __name__ == "__main__": + main() diff --git a/run_clean_dashboard.py b/run_clean_dashboard.py index 103d80e..cb8868e 100644 --- a/run_clean_dashboard.py +++ b/run_clean_dashboard.py @@ -60,6 +60,118 @@ def check_system_resources(): return False return True +def kill_existing_dashboard_processes(): + """Kill any existing dashboard processes and free port 8050""" + import subprocess + import signal + + try: + # Find processes using port 8050 + logger.info("Checking for processes using port 8050...") + + # Method 1: Use lsof to find processes using port 8050 + try: + result = subprocess.run(['lsof', '-ti', ':8050'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0 and result.stdout.strip(): + pids = result.stdout.strip().split('\n') + logger.info(f"Found processes using port 8050: {pids}") + + for pid in pids: + if pid.strip(): + try: + logger.info(f"Killing process {pid}") + os.kill(int(pid), signal.SIGTERM) + time.sleep(1) + # Force kill if still running + os.kill(int(pid), signal.SIGKILL) + except (ProcessLookupError, ValueError) as e: + logger.debug(f"Process {pid} already terminated: {e}") + except Exception as e: + logger.warning(f"Error killing process {pid}: {e}") + except (subprocess.TimeoutExpired, FileNotFoundError): + logger.debug("lsof not available or timed out") + + # Method 2: Use ps and grep to find Python processes + try: + result = subprocess.run(['ps', 'aux'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0: + lines = result.stdout.split('\n') + for line in lines: + if 'run_clean_dashboard' in line or 'clean_dashboard' in line: + parts = line.split() + if len(parts) > 1: + pid = parts[1] + try: + logger.info(f"Killing dashboard process {pid}") + os.kill(int(pid), signal.SIGTERM) + time.sleep(1) + os.kill(int(pid), signal.SIGKILL) + except (ProcessLookupError, ValueError) as e: + logger.debug(f"Process {pid} already terminated: {e}") + except Exception as e: + logger.warning(f"Error killing process {pid}: {e}") + except (subprocess.TimeoutExpired, FileNotFoundError): + logger.debug("ps not available or timed out") + + # Method 3: Use netstat to find processes using port 8050 + try: + result = subprocess.run(['netstat', '-tlnp'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0: + lines = result.stdout.split('\n') + for line in lines: + if ':8050' in line and 'LISTEN' in line: + parts = line.split() + if len(parts) > 6: + pid_part = parts[6] + if '/' in pid_part: + pid = pid_part.split('/')[0] + try: + logger.info(f"Killing process {pid} using port 8050") + os.kill(int(pid), signal.SIGTERM) + time.sleep(1) + os.kill(int(pid), signal.SIGKILL) + except (ProcessLookupError, ValueError) as e: + logger.debug(f"Process {pid} already terminated: {e}") + except Exception as e: + logger.warning(f"Error killing process {pid}: {e}") + except (subprocess.TimeoutExpired, FileNotFoundError): + logger.debug("netstat not available or timed out") + + # Wait a bit for processes to fully terminate + time.sleep(2) + + # Verify port is free + try: + result = subprocess.run(['lsof', '-ti', ':8050'], + capture_output=True, text=True, timeout=5) + if result.returncode == 0 and result.stdout.strip(): + logger.warning("Port 8050 still in use after cleanup") + return False + else: + logger.info("Port 8050 is now free") + return True + except (subprocess.TimeoutExpired, FileNotFoundError): + logger.info("Port 8050 cleanup verification skipped") + return True + + except Exception as e: + logger.error(f"Error during process cleanup: {e}") + return False + +def check_port_availability(port=8050): + """Check if a port is available""" + import socket + + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', port)) + return True + except OSError: + return False + def run_dashboard_with_recovery(): """Run dashboard with automatic error recovery""" max_retries = 3 @@ -69,6 +181,14 @@ def run_dashboard_with_recovery(): try: logger.info(f"Starting Clean Trading Dashboard (attempt {retry_count + 1}/{max_retries})") + # Clean up existing processes and free port 8050 + if not check_port_availability(8050): + logger.info("Port 8050 is in use, cleaning up existing processes...") + if not kill_existing_dashboard_processes(): + logger.warning("Failed to free port 8050, waiting 10 seconds...") + time.sleep(10) + continue + # Check system resources if not check_system_resources(): logger.warning("System resources low, waiting 30 seconds...") diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index c84b3fb..19a5803 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -232,6 +232,9 @@ class CleanTradingDashboard: ''' + # Add API endpoints to the Flask server + self._add_api_endpoints() + # Suppress Dash development mode logging self.app.enable_dev_tools(debug=False, dev_tools_silence_routes_logging=True) @@ -265,6 +268,300 @@ class CleanTradingDashboard: logger.debug("Clean Trading Dashboard initialized with HIGH-FREQUENCY COB integration and signal generation") + def _add_api_endpoints(self): + """Add API endpoints to the Flask server for data access""" + from flask import jsonify, request + + @self.app.server.route('/api/stream-status', methods=['GET']) + def get_stream_status(): + """Get data stream status""" + try: + status = self.orchestrator.get_data_stream_status() + summary = self.orchestrator.get_stream_summary() + return jsonify({ + 'status': status, + 'summary': summary, + 'timestamp': datetime.now().isoformat() + }) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + @self.app.server.route('/api/ohlcv-data', methods=['GET']) + def get_ohlcv_data(): + """Get OHLCV data with indicators""" + try: + symbol = request.args.get('symbol', 'ETH/USDT') + timeframe = request.args.get('timeframe', '1m') + limit = int(request.args.get('limit', 300)) + + # Get OHLCV data from orchestrator + ohlcv_data = self._get_ohlcv_data_with_indicators(symbol, timeframe, limit) + return jsonify({ + 'symbol': symbol, + 'timeframe': timeframe, + 'data': ohlcv_data, + 'timestamp': datetime.now().isoformat() + }) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + @self.app.server.route('/api/cob-data', methods=['GET']) + def get_cob_data(): + """Get COB data with price buckets""" + try: + symbol = request.args.get('symbol', 'ETH/USDT') + limit = int(request.args.get('limit', 300)) + + # Get COB data from orchestrator + cob_data = self._get_cob_data_with_buckets(symbol, limit) + return jsonify({ + 'symbol': symbol, + 'data': cob_data, + 'timestamp': datetime.now().isoformat() + }) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + @self.app.server.route('/api/snapshot', methods=['POST']) + def create_snapshot(): + """Create a data snapshot""" + try: + filepath = self.orchestrator.save_data_snapshot() + return jsonify({ + 'filepath': filepath, + 'timestamp': datetime.now().isoformat() + }) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + @self.app.server.route('/api/health', methods=['GET']) + def health_check(): + """Health check endpoint""" + return jsonify({ + 'status': 'healthy', + 'dashboard_running': True, + 'orchestrator_active': hasattr(self, 'orchestrator'), + 'timestamp': datetime.now().isoformat() + }) + + def _get_ohlcv_data_with_indicators(self, symbol: str, timeframe: str, limit: int = 300): + """Get OHLCV data with technical indicators from data stream monitor""" + try: + # Get OHLCV data from data stream monitor + if hasattr(self.orchestrator, 'data_stream_monitor') and self.orchestrator.data_stream_monitor: + stream_key = f"ohlcv_{timeframe}" + if stream_key in self.orchestrator.data_stream_monitor.data_streams: + ohlcv_data = list(self.orchestrator.data_stream_monitor.data_streams[stream_key]) + + # Take the last 'limit' items + ohlcv_data = ohlcv_data[-limit:] if len(ohlcv_data) > limit else ohlcv_data + + if not ohlcv_data: + return [] + + # Convert to DataFrame for indicator calculation + df_data = [] + for item in ohlcv_data: + df_data.append({ + 'timestamp': item.get('timestamp', ''), + 'open': float(item.get('open', 0)), + 'high': float(item.get('high', 0)), + 'low': float(item.get('low', 0)), + 'close': float(item.get('close', 0)), + 'volume': float(item.get('volume', 0)) + }) + + if not df_data: + return [] + + df = pd.DataFrame(df_data) + df['timestamp'] = pd.to_datetime(df['timestamp']) + df.set_index('timestamp', inplace=True) + + # Add technical indicators + df['sma_20'] = df['close'].rolling(window=20).mean() + df['sma_50'] = df['close'].rolling(window=50).mean() + df['ema_12'] = df['close'].ewm(span=12).mean() + df['ema_26'] = df['close'].ewm(span=26).mean() + + # RSI + delta = df['close'].diff() + gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() + rs = gain / loss + df['rsi'] = 100 - (100 / (1 + rs)) + + # MACD + df['macd'] = df['ema_12'] - df['ema_26'] + df['macd_signal'] = df['macd'].ewm(span=9).mean() + df['macd_histogram'] = df['macd'] - df['macd_signal'] + + # Bollinger Bands + df['bb_middle'] = df['close'].rolling(window=20).mean() + bb_std = df['close'].rolling(window=20).std() + df['bb_upper'] = df['bb_middle'] + (bb_std * 2) + df['bb_lower'] = df['bb_middle'] - (bb_std * 2) + + # Volume indicators + df['volume_sma'] = df['volume'].rolling(window=20).mean() + df['volume_ratio'] = df['volume'] / df['volume_sma'] + + # Convert to list of dictionaries + result = [] + for _, row in df.iterrows(): + data_point = { + 'timestamp': row.name.isoformat() if hasattr(row.name, 'isoformat') else str(row.name), + 'open': float(row['open']), + 'high': float(row['high']), + 'low': float(row['low']), + 'close': float(row['close']), + 'volume': float(row['volume']), + 'indicators': { + 'sma_20': float(row['sma_20']) if pd.notna(row['sma_20']) else None, + 'sma_50': float(row['sma_50']) if pd.notna(row['sma_50']) else None, + 'ema_12': float(row['ema_12']) if pd.notna(row['ema_12']) else None, + 'ema_26': float(row['ema_26']) if pd.notna(row['ema_26']) else None, + 'rsi': float(row['rsi']) if pd.notna(row['rsi']) else None, + 'macd': float(row['macd']) if pd.notna(row['macd']) else None, + 'macd_signal': float(row['macd_signal']) if pd.notna(row['macd_signal']) else None, + 'macd_histogram': float(row['macd_histogram']) if pd.notna(row['macd_histogram']) else None, + 'bb_upper': float(row['bb_upper']) if pd.notna(row['bb_upper']) else None, + 'bb_middle': float(row['bb_middle']) if pd.notna(row['bb_middle']) else None, + 'bb_lower': float(row['bb_lower']) if pd.notna(row['bb_lower']) else None, + 'volume_ratio': float(row['volume_ratio']) if pd.notna(row['volume_ratio']) else None + } + } + result.append(data_point) + + return result + + # Fallback to data provider if stream monitor not available + ohlcv_data = self.data_provider.get_ohlcv(symbol, timeframe, limit=limit) + + if ohlcv_data is None or ohlcv_data.empty: + return [] + + # Add technical indicators + df = ohlcv_data.copy() + + # Basic indicators + df['sma_20'] = df['close'].rolling(window=20).mean() + df['sma_50'] = df['close'].rolling(window=50).mean() + df['ema_12'] = df['close'].ewm(span=12).mean() + df['ema_26'] = df['close'].ewm(span=26).mean() + + # RSI + delta = df['close'].diff() + gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() + loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() + rs = gain / loss + df['rsi'] = 100 - (100 / (1 + rs)) + + # MACD + df['macd'] = df['ema_12'] - df['ema_26'] + df['macd_signal'] = df['macd'].ewm(span=9).mean() + df['macd_histogram'] = df['macd'] - df['macd_signal'] + + # Bollinger Bands + df['bb_middle'] = df['close'].rolling(window=20).mean() + bb_std = df['close'].rolling(window=20).std() + df['bb_upper'] = df['bb_middle'] + (bb_std * 2) + df['bb_lower'] = df['bb_middle'] - (bb_std * 2) + + # Volume indicators + df['volume_sma'] = df['volume'].rolling(window=20).mean() + df['volume_ratio'] = df['volume'] / df['volume_sma'] + + # Convert to list of dictionaries + result = [] + for _, row in df.iterrows(): + data_point = { + 'timestamp': row.name.isoformat() if hasattr(row.name, 'isoformat') else str(row.name), + 'open': float(row['open']), + 'high': float(row['high']), + 'low': float(row['low']), + 'close': float(row['close']), + 'volume': float(row['volume']), + 'indicators': { + 'sma_20': float(row['sma_20']) if pd.notna(row['sma_20']) else None, + 'sma_50': float(row['sma_50']) if pd.notna(row['sma_50']) else None, + 'ema_12': float(row['ema_12']) if pd.notna(row['ema_12']) else None, + 'ema_26': float(row['ema_26']) if pd.notna(row['ema_26']) else None, + 'rsi': float(row['rsi']) if pd.notna(row['rsi']) else None, + 'macd': float(row['macd']) if pd.notna(row['macd']) else None, + 'macd_signal': float(row['macd_signal']) if pd.notna(row['macd_signal']) else None, + 'macd_histogram': float(row['macd_histogram']) if pd.notna(row['macd_histogram']) else None, + 'bb_upper': float(row['bb_upper']) if pd.notna(row['bb_upper']) else None, + 'bb_middle': float(row['bb_middle']) if pd.notna(row['bb_middle']) else None, + 'bb_lower': float(row['bb_lower']) if pd.notna(row['bb_lower']) else None, + 'volume_ratio': float(row['volume_ratio']) if pd.notna(row['volume_ratio']) else None + } + } + result.append(data_point) + + return result + + except Exception as e: + logger.error(f"Error getting OHLCV data: {e}") + return [] + + def _get_cob_data_with_buckets(self, symbol: str, limit: int = 300): + """Get COB data with price buckets ($1 increments)""" + try: + # Get COB data from orchestrator + cob_data = self.orchestrator.get_cob_data(symbol, limit) + + if not cob_data: + return [] + + # Process COB data into price buckets + result = [] + for cob_snapshot in cob_data: + # Create price buckets ($1 increments) + price_buckets = {} + mid_price = cob_snapshot.mid_price + + # Create buckets around mid price + for i in range(-50, 51): # -$50 to +$50 from mid price + bucket_price = mid_price + i + bucket_key = f"{bucket_price:.2f}" + price_buckets[bucket_key] = { + 'bid_volume': 0, + 'ask_volume': 0, + 'bid_count': 0, + 'ask_count': 0 + } + + # Fill buckets with order book data + for level in cob_snapshot.bids: + bucket_price = f"{level.price:.2f}" + if bucket_price in price_buckets: + price_buckets[bucket_price]['bid_volume'] += level.volume + price_buckets[bucket_price]['bid_count'] += 1 + + for level in cob_snapshot.asks: + bucket_price = f"{level.price:.2f}" + if bucket_price in price_buckets: + price_buckets[bucket_price]['ask_volume'] += level.volume + price_buckets[bucket_price]['ask_count'] += 1 + + data_point = { + 'timestamp': cob_snapshot.timestamp.isoformat() if hasattr(cob_snapshot.timestamp, 'isoformat') else str(cob_snapshot.timestamp), + 'mid_price': float(cob_snapshot.mid_price), + 'spread': float(cob_snapshot.spread), + 'imbalance': float(cob_snapshot.imbalance), + 'price_buckets': price_buckets, + 'total_bid_volume': float(cob_snapshot.total_bid_volume), + 'total_ask_volume': float(cob_snapshot.total_ask_volume) + } + result.append(data_point) + + return result + + except Exception as e: + logger.error(f"Error getting COB data: {e}") + return [] + def _get_universal_data_from_orchestrator(self) -> Optional[UniversalDataStream]: """Get universal data through orchestrator as per architecture.""" try: