#!/usr/bin/env python3 """ MEXC Auto Browser with Request Interception This script automatically spawns a ChromeDriver instance and captures all MEXC futures trading requests in real-time, including full request and response data needed for reverse engineering. """ import logging import time import json import sys import os from typing import Dict, List, Optional, Any from datetime import datetime import threading import queue # Selenium imports try: from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, WebDriverException from webdriver_manager.chrome import ChromeDriverManager except ImportError: print("Please install selenium and webdriver-manager:") print("pip install selenium webdriver-manager") sys.exit(1) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class MEXCRequestInterceptor: """ Automatically spawns ChromeDriver and intercepts all MEXC API requests """ def __init__(self, headless: bool = False, save_to_file: bool = True): """ Initialize the request interceptor Args: headless: Run browser in headless mode save_to_file: Save captured requests to JSON file """ self.driver = None self.headless = headless self.save_to_file = save_to_file self.captured_requests = [] self.captured_responses = [] self.session_cookies = {} self.monitoring = False self.request_queue = queue.Queue() # File paths for saving data self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.requests_file = f"mexc_requests_{self.timestamp}.json" self.cookies_file = f"mexc_cookies_{self.timestamp}.json" def setup_browser(self): """Setup Chrome browser with necessary options""" chrome_options = webdriver.ChromeOptions() # Enable headless mode if needed if self.headless: chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('--disable-extensions') # Set up Chrome options with a user data directory to persist session user_data_base_dir = os.path.join(os.getcwd(), 'chrome_user_data') os.makedirs(user_data_base_dir, exist_ok=True) # Check for existing session directories session_dirs = [d for d in os.listdir(user_data_base_dir) if d.startswith('session_')] session_dirs.sort(reverse=True) # Sort descending to get the most recent first user_data_dir = None if session_dirs: use_existing = input(f"Found {len(session_dirs)} existing sessions. Use an existing session? (y/n): ").lower().strip() == 'y' if use_existing: print("Available sessions:") for i, session in enumerate(session_dirs[:5], 1): # Show up to 5 most recent print(f"{i}. {session}") choice = input("Enter session number (default 1) or any other key for most recent: ") if choice.isdigit() and 1 <= int(choice) <= len(session_dirs): selected_session = session_dirs[int(choice) - 1] else: selected_session = session_dirs[0] user_data_dir = os.path.join(user_data_base_dir, selected_session) print(f"Using session: {selected_session}") if user_data_dir is None: user_data_dir = os.path.join(user_data_base_dir, f'session_{self.timestamp}') os.makedirs(user_data_dir, exist_ok=True) print(f"Creating new session: session_{self.timestamp}") chrome_options.add_argument(f'--user-data-dir={user_data_dir}') # Enable logging to capture JS console output and network activity chrome_options.set_capability('goog:loggingPrefs', { 'browser': 'ALL', 'performance': 'ALL' }) try: self.driver = webdriver.Chrome(options=chrome_options) except Exception as e: print(f"Failed to start browser with session: {e}") print("Falling back to a new session...") user_data_dir = os.path.join(user_data_base_dir, f'session_{self.timestamp}_fallback') os.makedirs(user_data_dir, exist_ok=True) print(f"Creating fallback session: session_{self.timestamp}_fallback") chrome_options = webdriver.ChromeOptions() if self.headless: chrome_options.add_argument('--headless') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('--disable-extensions') chrome_options.add_argument(f'--user-data-dir={user_data_dir}') chrome_options.set_capability('goog:loggingPrefs', { 'browser': 'ALL', 'performance': 'ALL' }) self.driver = webdriver.Chrome(options=chrome_options) return self.driver def start_monitoring(self): """Start the browser and begin monitoring""" logger.info("Starting MEXC Request Interceptor...") try: # Setup ChromeDriver self.driver = self.setup_browser() # Navigate to MEXC futures mexc_url = "https://www.mexc.com/en-GB/futures/ETH_USDT?type=linear_swap" logger.info(f"Navigating to: {mexc_url}") self.driver.get(mexc_url) # Wait for page load WebDriverWait(self.driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) logger.info("โœ… MEXC page loaded successfully!") logger.info("๐Ÿ“ Please log in manually in the browser window") logger.info("๐Ÿ” Request monitoring is now active...") # Start monitoring in background thread self.monitoring = True monitor_thread = threading.Thread(target=self._monitor_requests, daemon=True) monitor_thread.start() # Wait for manual login self._wait_for_login() return True except Exception as e: logger.error(f"Failed to start monitoring: {e}") return False def _wait_for_login(self): """Wait for user to log in and show interactive menu""" logger.info("\n" + "="*60) logger.info("MEXC REQUEST INTERCEPTOR - INTERACTIVE MODE") logger.info("="*60) while True: print("\nOptions:") print("1. Check login status") print("2. Extract current cookies") print("3. Show captured requests summary") print("4. Save captured data to files") print("5. Perform test trade (manual)") print("6. Monitor for 60 seconds") print("0. Stop and exit") choice = input("\nEnter choice (0-6): ").strip() if choice == "1": self._check_login_status() elif choice == "2": self._extract_cookies() elif choice == "3": self._show_requests_summary() elif choice == "4": self._save_all_data() elif choice == "5": self._guide_test_trade() elif choice == "6": self._monitor_for_duration(60) elif choice == "0": break else: print("Invalid choice. Please try again.") self.stop_monitoring() def _check_login_status(self): """Check if user is logged into MEXC""" try: cookies = self.driver.get_cookies() auth_cookies = ['uc_token', 'u_id', 'x-mxc-fingerprint'] found_auth = [] for cookie in cookies: if cookie['name'] in auth_cookies and cookie['value']: found_auth.append(cookie['name']) if len(found_auth) >= 2: print("โœ… LOGIN DETECTED - You appear to be logged in!") print(f" Found auth cookies: {', '.join(found_auth)}") return True else: print("โŒ NOT LOGGED IN - Please log in to MEXC in the browser") print(" Missing required authentication cookies") return False except Exception as e: print(f"โŒ Error checking login: {e}") return False def _extract_cookies(self): """Extract and display current session cookies""" try: cookies = self.driver.get_cookies() cookie_dict = {} for cookie in cookies: cookie_dict[cookie['name']] = cookie['value'] self.session_cookies = cookie_dict print(f"\n๐Ÿ“Š Extracted {len(cookie_dict)} cookies:") # Show important cookies important = ['uc_token', 'u_id', 'x-mxc-fingerprint', 'mexc_fingerprint_visitorId'] for name in important: if name in cookie_dict: value = cookie_dict[name] display_value = value[:20] + "..." if len(value) > 20 else value print(f" โœ… {name}: {display_value}") else: print(f" โŒ {name}: Missing") # Save cookies to file if self.save_to_file: with open(self.cookies_file, 'w') as f: json.dump(cookie_dict, f, indent=2) print(f"\n๐Ÿ’พ Cookies saved to: {self.cookies_file}") except Exception as e: print(f"โŒ Error extracting cookies: {e}") def _monitor_requests(self): """Background thread to monitor network requests""" last_log_count = 0 while self.monitoring: try: # Get performance logs logs = self.driver.get_log('performance') for log in logs: try: message = json.loads(log['message']) method = message.get('message', {}).get('method', '') # Capture network requests if method == 'Network.requestWillBeSent': self._process_request(message['message']['params']) elif method == 'Network.responseReceived': self._process_response(message['message']['params']) except (json.JSONDecodeError, KeyError) as e: continue # Show progress every 10 new requests if len(self.captured_requests) >= last_log_count + 10: last_log_count = len(self.captured_requests) logger.info(f"๐Ÿ“ˆ Captured {len(self.captured_requests)} requests, {len(self.captured_responses)} responses") except Exception as e: if self.monitoring: # Only log if we're still supposed to be monitoring logger.debug(f"Monitor error: {e}") time.sleep(0.5) # Check every 500ms def _process_request(self, request_data): """Process a captured network request""" try: url = request_data.get('request', {}).get('url', '') # Filter for MEXC API requests if self._is_mexc_request(url): request_info = { 'type': 'request', 'timestamp': datetime.now().isoformat(), 'url': url, 'method': request_data.get('request', {}).get('method', ''), 'headers': request_data.get('request', {}).get('headers', {}), 'postData': request_data.get('request', {}).get('postData', ''), 'requestId': request_data.get('requestId', '') } self.captured_requests.append(request_info) # Show important requests immediately if ('futures.mexc.com' in url or 'captcha' in url): print(f"\n๐Ÿš€ CAPTURED REQUEST: {request_info['method']} {url}") if request_info['postData']: print(f" ๐Ÿ“„ POST Data: {request_info['postData'][:100]}...") # Enhanced captcha detection and detailed logging if 'captcha' in url.lower() or 'robot' in url.lower(): logger.info(f"CAPTCHA REQUEST DETECTED: {request_data.get('request', {}).get('method', 'UNKNOWN')} {url}") logger.info(f" Headers: {request_data.get('request', {}).get('headers', {})}") if request_data.get('request', {}).get('postData', ''): logger.info(f" Data: {request_data.get('request', {}).get('postData', '')}") # Attempt to capture related JavaScript or DOM elements (if possible) if self.driver is not None: try: js_snippet = self.driver.execute_script("return document.querySelector('script[src*=\"captcha\"]') ? document.querySelector('script[src*=\"captcha\"]').outerHTML : 'No captcha script found';") logger.info(f" Related JS Snippet: {js_snippet}") except Exception as e: logger.warning(f" Could not capture JS snippet: {e}") try: dom_element = self.driver.execute_script("return document.querySelector('div[id*=\"captcha\"]') ? document.querySelector('div[id*=\"captcha\"]').outerHTML : 'No captcha element found';") logger.info(f" Related DOM Element: {dom_element}") except Exception as e: logger.warning(f" Could not capture DOM element: {e}") else: logger.warning(" Driver not initialized, cannot capture JS or DOM elements") except Exception as e: logger.debug(f"Error processing request: {e}") def _process_response(self, response_data): """Process a captured network response""" try: url = response_data.get('response', {}).get('url', '') # Filter for MEXC API responses if self._is_mexc_request(url): response_info = { 'type': 'response', 'timestamp': datetime.now().isoformat(), 'url': url, 'status': response_data.get('response', {}).get('status', 0), 'headers': response_data.get('response', {}).get('headers', {}), 'requestId': response_data.get('requestId', '') } self.captured_responses.append(response_info) # Show important responses immediately if ('futures.mexc.com' in url or 'captcha' in url): status = response_info['status'] status_emoji = "โœ…" if status == 200 else "โŒ" print(f" {status_emoji} RESPONSE: {status} for {url}") except Exception as e: logger.debug(f"Error processing response: {e}") def _is_mexc_request(self, url: str) -> bool: """Check if URL is a relevant MEXC API request""" mexc_indicators = [ 'futures.mexc.com', 'ucgateway/captcha_api', 'api/v1/private', 'api/v3/order', 'mexc.com/api' ] return any(indicator in url for indicator in mexc_indicators) def _show_requests_summary(self): """Show summary of captured requests""" print(f"\n๐Ÿ“Š CAPTURE SUMMARY:") print(f" Total Requests: {len(self.captured_requests)}") print(f" Total Responses: {len(self.captured_responses)}") # Group by URL pattern url_counts = {} for req in self.captured_requests: base_url = req['url'].split('?')[0] # Remove query params url_counts[base_url] = url_counts.get(base_url, 0) + 1 print("\n๐Ÿ”— Top URLs:") for url, count in sorted(url_counts.items(), key=lambda x: x[1], reverse=True)[:5]: print(f" {count}x {url}") # Show recent futures API calls futures_requests = [r for r in self.captured_requests if 'futures.mexc.com' in r['url']] if futures_requests: print(f"\n๐Ÿš€ Futures API Calls: {len(futures_requests)}") for req in futures_requests[-3:]: # Show last 3 print(f" {req['method']} {req['url']}") def _save_all_data(self): """Save all captured data to files""" if not self.save_to_file: print("File saving is disabled") return try: # Save requests with open(self.requests_file, 'w') as f: json.dump({ 'requests': self.captured_requests, 'responses': self.captured_responses, 'summary': { 'total_requests': len(self.captured_requests), 'total_responses': len(self.captured_responses), 'capture_session': self.timestamp } }, f, indent=2) # Save cookies if we have them if self.session_cookies: with open(self.cookies_file, 'w') as f: json.dump(self.session_cookies, f, indent=2) print(f"\n๐Ÿ’พ Data saved to:") print(f" ๐Ÿ“‹ Requests: {self.requests_file}") if self.session_cookies: print(f" ๐Ÿช Cookies: {self.cookies_file}") # Extract and save CAPTCHA tokens from captured requests captcha_tokens = self.extract_captcha_tokens() if captcha_tokens: captcha_file = f"mexc_captcha_tokens_{self.timestamp}.json" with open(captcha_file, 'w') as f: json.dump(captcha_tokens, f, indent=2) logger.info(f"Saved CAPTCHA tokens to {captcha_file}") else: logger.warning("No CAPTCHA tokens found in captured requests") except Exception as e: print(f"โŒ Error saving data: {e}") def _guide_test_trade(self): """Guide user through performing a test trade""" print("\n๐Ÿงช TEST TRADE GUIDE:") print("1. Make sure you're logged into MEXC") print("2. Go to the trading interface") print("3. Try to place a SMALL test trade (it may fail, but we'll capture the requests)") print("4. Watch the console for captured API calls") print("\nโš ๏ธ IMPORTANT: Use very small amounts for testing!") input("\nPress Enter when you're ready to start monitoring...") self._monitor_for_duration(120) # Monitor for 2 minutes def _monitor_for_duration(self, seconds: int): """Monitor requests for a specific duration""" print(f"\n๐Ÿ” Monitoring requests for {seconds} seconds...") print("Perform your trading actions now!") start_time = time.time() initial_count = len(self.captured_requests) while time.time() - start_time < seconds: current_count = len(self.captured_requests) new_requests = current_count - initial_count remaining = seconds - int(time.time() - start_time) print(f"\rโฑ๏ธ Time remaining: {remaining}s | New requests: {new_requests}", end="", flush=True) time.sleep(1) final_count = len(self.captured_requests) new_total = final_count - initial_count print(f"\nโœ… Monitoring complete! Captured {new_total} new requests") def stop_monitoring(self): """Stop monitoring and close browser""" logger.info("Stopping request monitoring...") self.monitoring = False if self.driver: self.driver.quit() logger.info("Browser closed") # Final save if self.save_to_file and (self.captured_requests or self.captured_responses): self._save_all_data() logger.info("Final data save complete") def extract_captcha_tokens(self): """Extract CAPTCHA tokens from captured requests""" captcha_tokens = [] for request in self.captured_requests: if 'captcha-token' in request.get('headers', {}): token = request['headers']['captcha-token'] captcha_tokens.append({ 'token': token, 'url': request.get('url', ''), 'timestamp': request.get('timestamp', '') }) elif 'captcha' in request.get('url', '').lower(): response = request.get('response', {}) if response and 'captcha-token' in response.get('headers', {}): token = response['headers']['captcha-token'] captcha_tokens.append({ 'token': token, 'url': request.get('url', ''), 'timestamp': request.get('timestamp', '') }) return captcha_tokens def main(): """Main function to run the interceptor""" print("๐Ÿš€ MEXC Request Interceptor with ChromeDriver") print("=" * 50) print("This will automatically:") print("โœ… Download/setup ChromeDriver") print("โœ… Open MEXC futures page") print("โœ… Capture all API requests/responses") print("โœ… Extract session cookies") print("โœ… Save data to JSON files") print("\nPress Ctrl+C to stop at any time") # Ask for preferences headless = input("\nRun in headless mode? (y/n): ").lower().strip() == 'y' interceptor = MEXCRequestInterceptor(headless=headless, save_to_file=True) try: success = interceptor.start_monitoring() if not success: print("โŒ Failed to start monitoring") return except KeyboardInterrupt: print("\n\nโน๏ธ Stopping interceptor...") except Exception as e: print(f"\nโŒ Error: {e}") finally: interceptor.stop_monitoring() print("\n๐Ÿ‘‹ Goodbye!") if __name__ == "__main__": main()