523 lines
22 KiB
Python
523 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MEXC Auto Browser with Request Interception
|
|
|
|
This script automatically spawns a ChromeDriver instance and captures
|
|
all MEXC futures trading requests in real-time, including full request
|
|
and response data needed for reverse engineering.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
import json
|
|
import sys
|
|
import os
|
|
from typing import Dict, List, Optional, Any
|
|
from datetime import datetime
|
|
import threading
|
|
import queue
|
|
|
|
# Selenium imports
|
|
try:
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import TimeoutException, WebDriverException
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
except ImportError:
|
|
print("Please install selenium and webdriver-manager:")
|
|
print("pip install selenium webdriver-manager")
|
|
sys.exit(1)
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MEXCRequestInterceptor:
|
|
"""
|
|
Automatically spawns ChromeDriver and intercepts all MEXC API requests
|
|
"""
|
|
|
|
def __init__(self, headless: bool = False, save_to_file: bool = True):
|
|
"""
|
|
Initialize the request interceptor
|
|
|
|
Args:
|
|
headless: Run browser in headless mode
|
|
save_to_file: Save captured requests to JSON file
|
|
"""
|
|
self.driver = None
|
|
self.headless = headless
|
|
self.save_to_file = save_to_file
|
|
self.captured_requests = []
|
|
self.captured_responses = []
|
|
self.session_cookies = {}
|
|
self.monitoring = False
|
|
self.request_queue = queue.Queue()
|
|
|
|
# File paths for saving data
|
|
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
self.requests_file = f"mexc_requests_{self.timestamp}.json"
|
|
self.cookies_file = f"mexc_cookies_{self.timestamp}.json"
|
|
|
|
def setup_browser(self):
|
|
"""Setup Chrome browser with necessary options"""
|
|
chrome_options = webdriver.ChromeOptions()
|
|
# Enable headless mode if needed
|
|
if self.headless:
|
|
chrome_options.add_argument('--headless')
|
|
chrome_options.add_argument('--disable-gpu')
|
|
chrome_options.add_argument('--window-size=1920,1080')
|
|
chrome_options.add_argument('--disable-extensions')
|
|
|
|
# Set up Chrome options with a user data directory to persist session
|
|
user_data_base_dir = os.path.join(os.getcwd(), 'chrome_user_data')
|
|
os.makedirs(user_data_base_dir, exist_ok=True)
|
|
|
|
# Check for existing session directories
|
|
session_dirs = [d for d in os.listdir(user_data_base_dir) if d.startswith('session_')]
|
|
session_dirs.sort(reverse=True) # Sort descending to get the most recent first
|
|
|
|
user_data_dir = None
|
|
if session_dirs:
|
|
use_existing = input(f"Found {len(session_dirs)} existing sessions. Use an existing session? (y/n): ").lower().strip() == 'y'
|
|
if use_existing:
|
|
print("Available sessions:")
|
|
for i, session in enumerate(session_dirs[:5], 1): # Show up to 5 most recent
|
|
print(f"{i}. {session}")
|
|
choice = input("Enter session number (default 1) or any other key for most recent: ")
|
|
if choice.isdigit() and 1 <= int(choice) <= len(session_dirs):
|
|
selected_session = session_dirs[int(choice) - 1]
|
|
else:
|
|
selected_session = session_dirs[0]
|
|
user_data_dir = os.path.join(user_data_base_dir, selected_session)
|
|
print(f"Using session: {selected_session}")
|
|
|
|
if user_data_dir is None:
|
|
user_data_dir = os.path.join(user_data_base_dir, f'session_{self.timestamp}')
|
|
os.makedirs(user_data_dir, exist_ok=True)
|
|
print(f"Creating new session: session_{self.timestamp}")
|
|
|
|
chrome_options.add_argument(f'--user-data-dir={user_data_dir}')
|
|
|
|
# Enable logging to capture JS console output and network activity
|
|
chrome_options.set_capability('goog:loggingPrefs', {
|
|
'browser': 'ALL',
|
|
'performance': 'ALL'
|
|
})
|
|
|
|
try:
|
|
self.driver = webdriver.Chrome(options=chrome_options)
|
|
except Exception as e:
|
|
print(f"Failed to start browser with session: {e}")
|
|
print("Falling back to a new session...")
|
|
user_data_dir = os.path.join(user_data_base_dir, f'session_{self.timestamp}_fallback')
|
|
os.makedirs(user_data_dir, exist_ok=True)
|
|
print(f"Creating fallback session: session_{self.timestamp}_fallback")
|
|
chrome_options = webdriver.ChromeOptions()
|
|
if self.headless:
|
|
chrome_options.add_argument('--headless')
|
|
chrome_options.add_argument('--disable-gpu')
|
|
chrome_options.add_argument('--window-size=1920,1080')
|
|
chrome_options.add_argument('--disable-extensions')
|
|
chrome_options.add_argument(f'--user-data-dir={user_data_dir}')
|
|
chrome_options.set_capability('goog:loggingPrefs', {
|
|
'browser': 'ALL',
|
|
'performance': 'ALL'
|
|
})
|
|
self.driver = webdriver.Chrome(options=chrome_options)
|
|
|
|
return self.driver
|
|
|
|
def start_monitoring(self):
|
|
"""Start the browser and begin monitoring"""
|
|
logger.info("Starting MEXC Request Interceptor...")
|
|
|
|
try:
|
|
# Setup ChromeDriver
|
|
self.driver = self.setup_browser()
|
|
|
|
# Navigate to MEXC futures
|
|
mexc_url = "https://www.mexc.com/en-GB/futures/ETH_USDT?type=linear_swap"
|
|
logger.info(f"Navigating to: {mexc_url}")
|
|
self.driver.get(mexc_url)
|
|
|
|
# Wait for page load
|
|
WebDriverWait(self.driver, 10).until(
|
|
EC.presence_of_element_located((By.TAG_NAME, "body"))
|
|
)
|
|
|
|
logger.info("✅ MEXC page loaded successfully!")
|
|
logger.info("📝 Please log in manually in the browser window")
|
|
logger.info("🔍 Request monitoring is now active...")
|
|
|
|
# Start monitoring in background thread
|
|
self.monitoring = True
|
|
monitor_thread = threading.Thread(target=self._monitor_requests, daemon=True)
|
|
monitor_thread.start()
|
|
|
|
# Wait for manual login
|
|
self._wait_for_login()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start monitoring: {e}")
|
|
return False
|
|
|
|
def _wait_for_login(self):
|
|
"""Wait for user to log in and show interactive menu"""
|
|
logger.info("\n" + "="*60)
|
|
logger.info("MEXC REQUEST INTERCEPTOR - INTERACTIVE MODE")
|
|
logger.info("="*60)
|
|
|
|
while True:
|
|
print("\nOptions:")
|
|
print("1. Check login status")
|
|
print("2. Extract current cookies")
|
|
print("3. Show captured requests summary")
|
|
print("4. Save captured data to files")
|
|
print("5. Perform test trade (manual)")
|
|
print("6. Monitor for 60 seconds")
|
|
print("0. Stop and exit")
|
|
|
|
choice = input("\nEnter choice (0-6): ").strip()
|
|
|
|
if choice == "1":
|
|
self._check_login_status()
|
|
elif choice == "2":
|
|
self._extract_cookies()
|
|
elif choice == "3":
|
|
self._show_requests_summary()
|
|
elif choice == "4":
|
|
self._save_all_data()
|
|
elif choice == "5":
|
|
self._guide_test_trade()
|
|
elif choice == "6":
|
|
self._monitor_for_duration(60)
|
|
elif choice == "0":
|
|
break
|
|
else:
|
|
print("Invalid choice. Please try again.")
|
|
|
|
self.stop_monitoring()
|
|
|
|
def _check_login_status(self):
|
|
"""Check if user is logged into MEXC"""
|
|
try:
|
|
cookies = self.driver.get_cookies()
|
|
auth_cookies = ['uc_token', 'u_id', 'x-mxc-fingerprint']
|
|
found_auth = []
|
|
|
|
for cookie in cookies:
|
|
if cookie['name'] in auth_cookies and cookie['value']:
|
|
found_auth.append(cookie['name'])
|
|
|
|
if len(found_auth) >= 2:
|
|
print("✅ LOGIN DETECTED - You appear to be logged in!")
|
|
print(f" Found auth cookies: {', '.join(found_auth)}")
|
|
return True
|
|
else:
|
|
print("❌ NOT LOGGED IN - Please log in to MEXC in the browser")
|
|
print(" Missing required authentication cookies")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error checking login: {e}")
|
|
return False
|
|
|
|
def _extract_cookies(self):
|
|
"""Extract and display current session cookies"""
|
|
try:
|
|
cookies = self.driver.get_cookies()
|
|
cookie_dict = {}
|
|
|
|
for cookie in cookies:
|
|
cookie_dict[cookie['name']] = cookie['value']
|
|
|
|
self.session_cookies = cookie_dict
|
|
|
|
print(f"\n📊 Extracted {len(cookie_dict)} cookies:")
|
|
|
|
# Show important cookies
|
|
important = ['uc_token', 'u_id', 'x-mxc-fingerprint', 'mexc_fingerprint_visitorId']
|
|
for name in important:
|
|
if name in cookie_dict:
|
|
value = cookie_dict[name]
|
|
display_value = value[:20] + "..." if len(value) > 20 else value
|
|
print(f" ✅ {name}: {display_value}")
|
|
else:
|
|
print(f" ❌ {name}: Missing")
|
|
|
|
# Save cookies to file
|
|
if self.save_to_file:
|
|
with open(self.cookies_file, 'w') as f:
|
|
json.dump(cookie_dict, f, indent=2)
|
|
print(f"\n💾 Cookies saved to: {self.cookies_file}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error extracting cookies: {e}")
|
|
|
|
def _monitor_requests(self):
|
|
"""Background thread to monitor network requests"""
|
|
last_log_count = 0
|
|
|
|
while self.monitoring:
|
|
try:
|
|
# Get performance logs
|
|
logs = self.driver.get_log('performance')
|
|
|
|
for log in logs:
|
|
try:
|
|
message = json.loads(log['message'])
|
|
method = message.get('message', {}).get('method', '')
|
|
|
|
# Capture network requests
|
|
if method == 'Network.requestWillBeSent':
|
|
self._process_request(message['message']['params'])
|
|
elif method == 'Network.responseReceived':
|
|
self._process_response(message['message']['params'])
|
|
|
|
except (json.JSONDecodeError, KeyError) as e:
|
|
continue
|
|
|
|
# Show progress every 10 new requests
|
|
if len(self.captured_requests) >= last_log_count + 10:
|
|
last_log_count = len(self.captured_requests)
|
|
logger.info(f"📈 Captured {len(self.captured_requests)} requests, {len(self.captured_responses)} responses")
|
|
|
|
except Exception as e:
|
|
if self.monitoring: # Only log if we're still supposed to be monitoring
|
|
logger.debug(f"Monitor error: {e}")
|
|
|
|
time.sleep(0.5) # Check every 500ms
|
|
|
|
def _process_request(self, request_data):
|
|
"""Process a captured network request"""
|
|
try:
|
|
url = request_data.get('request', {}).get('url', '')
|
|
|
|
# Filter for MEXC API requests
|
|
if self._is_mexc_request(url):
|
|
request_info = {
|
|
'type': 'request',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'url': url,
|
|
'method': request_data.get('request', {}).get('method', ''),
|
|
'headers': request_data.get('request', {}).get('headers', {}),
|
|
'postData': request_data.get('request', {}).get('postData', ''),
|
|
'requestId': request_data.get('requestId', '')
|
|
}
|
|
|
|
self.captured_requests.append(request_info)
|
|
|
|
# Show important requests immediately
|
|
if ('futures.mexc.com' in url or 'captcha' in url):
|
|
print(f"\n🚀 CAPTURED REQUEST: {request_info['method']} {url}")
|
|
if request_info['postData']:
|
|
print(f" 📄 POST Data: {request_info['postData'][:100]}...")
|
|
|
|
# Enhanced captcha detection and detailed logging
|
|
if 'captcha' in url.lower() or 'robot' in url.lower():
|
|
logger.info(f"CAPTCHA REQUEST DETECTED: {request_data.get('request', {}).get('method', 'UNKNOWN')} {url}")
|
|
logger.info(f" Headers: {request_data.get('request', {}).get('headers', {})}")
|
|
if request_data.get('request', {}).get('postData', ''):
|
|
logger.info(f" Data: {request_data.get('request', {}).get('postData', '')}")
|
|
# Attempt to capture related JavaScript or DOM elements (if possible)
|
|
if self.driver is not None:
|
|
try:
|
|
js_snippet = self.driver.execute_script("return document.querySelector('script[src*=\"captcha\"]') ? document.querySelector('script[src*=\"captcha\"]').outerHTML : 'No captcha script found';")
|
|
logger.info(f" Related JS Snippet: {js_snippet}")
|
|
except Exception as e:
|
|
logger.warning(f" Could not capture JS snippet: {e}")
|
|
try:
|
|
dom_element = self.driver.execute_script("return document.querySelector('div[id*=\"captcha\"]') ? document.querySelector('div[id*=\"captcha\"]').outerHTML : 'No captcha element found';")
|
|
logger.info(f" Related DOM Element: {dom_element}")
|
|
except Exception as e:
|
|
logger.warning(f" Could not capture DOM element: {e}")
|
|
else:
|
|
logger.warning(" Driver not initialized, cannot capture JS or DOM elements")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error processing request: {e}")
|
|
|
|
def _process_response(self, response_data):
|
|
"""Process a captured network response"""
|
|
try:
|
|
url = response_data.get('response', {}).get('url', '')
|
|
|
|
# Filter for MEXC API responses
|
|
if self._is_mexc_request(url):
|
|
response_info = {
|
|
'type': 'response',
|
|
'timestamp': datetime.now().isoformat(),
|
|
'url': url,
|
|
'status': response_data.get('response', {}).get('status', 0),
|
|
'headers': response_data.get('response', {}).get('headers', {}),
|
|
'requestId': response_data.get('requestId', '')
|
|
}
|
|
|
|
self.captured_responses.append(response_info)
|
|
|
|
# Show important responses immediately
|
|
if ('futures.mexc.com' in url or 'captcha' in url):
|
|
status = response_info['status']
|
|
status_emoji = "✅" if status == 200 else "❌"
|
|
print(f" {status_emoji} RESPONSE: {status} for {url}")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error processing response: {e}")
|
|
|
|
def _is_mexc_request(self, url: str) -> bool:
|
|
"""Check if URL is a relevant MEXC API request"""
|
|
mexc_indicators = [
|
|
'futures.mexc.com',
|
|
'ucgateway/captcha_api',
|
|
'api/v1/private',
|
|
'api/v3/order',
|
|
'mexc.com/api'
|
|
]
|
|
|
|
return any(indicator in url for indicator in mexc_indicators)
|
|
|
|
def _show_requests_summary(self):
|
|
"""Show summary of captured requests"""
|
|
print(f"\n📊 CAPTURE SUMMARY:")
|
|
print(f" Total Requests: {len(self.captured_requests)}")
|
|
print(f" Total Responses: {len(self.captured_responses)}")
|
|
|
|
# Group by URL pattern
|
|
url_counts = {}
|
|
for req in self.captured_requests:
|
|
base_url = req['url'].split('?')[0] # Remove query params
|
|
url_counts[base_url] = url_counts.get(base_url, 0) + 1
|
|
|
|
print("\n🔗 Top URLs:")
|
|
for url, count in sorted(url_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
|
|
print(f" {count}x {url}")
|
|
|
|
# Show recent futures API calls
|
|
futures_requests = [r for r in self.captured_requests if 'futures.mexc.com' in r['url']]
|
|
if futures_requests:
|
|
print(f"\n🚀 Futures API Calls: {len(futures_requests)}")
|
|
for req in futures_requests[-3:]: # Show last 3
|
|
print(f" {req['method']} {req['url']}")
|
|
|
|
def _save_all_data(self):
|
|
"""Save all captured data to files"""
|
|
if not self.save_to_file:
|
|
print("File saving is disabled")
|
|
return
|
|
|
|
try:
|
|
# Save requests
|
|
with open(self.requests_file, 'w') as f:
|
|
json.dump({
|
|
'requests': self.captured_requests,
|
|
'responses': self.captured_responses,
|
|
'summary': {
|
|
'total_requests': len(self.captured_requests),
|
|
'total_responses': len(self.captured_responses),
|
|
'capture_session': self.timestamp
|
|
}
|
|
}, f, indent=2)
|
|
|
|
# Save cookies if we have them
|
|
if self.session_cookies:
|
|
with open(self.cookies_file, 'w') as f:
|
|
json.dump(self.session_cookies, f, indent=2)
|
|
|
|
print(f"\n💾 Data saved to:")
|
|
print(f" 📋 Requests: {self.requests_file}")
|
|
if self.session_cookies:
|
|
print(f" 🍪 Cookies: {self.cookies_file}")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error saving data: {e}")
|
|
|
|
def _guide_test_trade(self):
|
|
"""Guide user through performing a test trade"""
|
|
print("\n🧪 TEST TRADE GUIDE:")
|
|
print("1. Make sure you're logged into MEXC")
|
|
print("2. Go to the trading interface")
|
|
print("3. Try to place a SMALL test trade (it may fail, but we'll capture the requests)")
|
|
print("4. Watch the console for captured API calls")
|
|
print("\n⚠️ IMPORTANT: Use very small amounts for testing!")
|
|
input("\nPress Enter when you're ready to start monitoring...")
|
|
|
|
self._monitor_for_duration(120) # Monitor for 2 minutes
|
|
|
|
def _monitor_for_duration(self, seconds: int):
|
|
"""Monitor requests for a specific duration"""
|
|
print(f"\n🔍 Monitoring requests for {seconds} seconds...")
|
|
print("Perform your trading actions now!")
|
|
|
|
start_time = time.time()
|
|
initial_count = len(self.captured_requests)
|
|
|
|
while time.time() - start_time < seconds:
|
|
current_count = len(self.captured_requests)
|
|
new_requests = current_count - initial_count
|
|
|
|
remaining = seconds - int(time.time() - start_time)
|
|
print(f"\r⏱️ Time remaining: {remaining}s | New requests: {new_requests}", end="", flush=True)
|
|
|
|
time.sleep(1)
|
|
|
|
final_count = len(self.captured_requests)
|
|
new_total = final_count - initial_count
|
|
print(f"\n✅ Monitoring complete! Captured {new_total} new requests")
|
|
|
|
def stop_monitoring(self):
|
|
"""Stop monitoring and close browser"""
|
|
logger.info("Stopping request monitoring...")
|
|
self.monitoring = False
|
|
|
|
if self.driver:
|
|
self.driver.quit()
|
|
logger.info("Browser closed")
|
|
|
|
# Final save
|
|
if self.save_to_file and (self.captured_requests or self.captured_responses):
|
|
self._save_all_data()
|
|
logger.info("Final data save complete")
|
|
|
|
def main():
|
|
"""Main function to run the interceptor"""
|
|
print("🚀 MEXC Request Interceptor with ChromeDriver")
|
|
print("=" * 50)
|
|
print("This will automatically:")
|
|
print("✅ Download/setup ChromeDriver")
|
|
print("✅ Open MEXC futures page")
|
|
print("✅ Capture all API requests/responses")
|
|
print("✅ Extract session cookies")
|
|
print("✅ Save data to JSON files")
|
|
print("\nPress Ctrl+C to stop at any time")
|
|
|
|
# Ask for preferences
|
|
headless = input("\nRun in headless mode? (y/n): ").lower().strip() == 'y'
|
|
|
|
interceptor = MEXCRequestInterceptor(headless=headless, save_to_file=True)
|
|
|
|
try:
|
|
success = interceptor.start_monitoring()
|
|
if not success:
|
|
print("❌ Failed to start monitoring")
|
|
return
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⏹️ Stopping interceptor...")
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
finally:
|
|
interceptor.stop_monitoring()
|
|
print("\n👋 Goodbye!")
|
|
|
|
if __name__ == "__main__":
|
|
main() |