mexc futures web client wip

This commit is contained in:
Dobromir Popov
2025-06-24 12:59:28 +03:00
parent d902e01197
commit 1f3166e1e5
11 changed files with 2139 additions and 0 deletions

View File

@ -0,0 +1,502 @@
#!/usr/bin/env python3
"""
MEXC Auto Browser with Request Interception
This script automatically spawns a ChromeDriver instance and captures
all MEXC futures trading requests in real-time, including full request
and response data needed for reverse engineering.
"""
import logging
import time
import json
import sys
import os
from typing import Dict, List, Optional, Any
from datetime import datetime
import threading
import queue
# Selenium imports
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
except ImportError:
print("Please install selenium and webdriver-manager:")
print("pip install selenium webdriver-manager")
sys.exit(1)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MEXCRequestInterceptor:
"""
Automatically spawns ChromeDriver and intercepts all MEXC API requests
"""
def __init__(self, headless: bool = False, save_to_file: bool = True):
"""
Initialize the request interceptor
Args:
headless: Run browser in headless mode
save_to_file: Save captured requests to JSON file
"""
self.driver = None
self.headless = headless
self.save_to_file = save_to_file
self.captured_requests = []
self.captured_responses = []
self.session_cookies = {}
self.monitoring = False
self.request_queue = queue.Queue()
# File paths for saving data
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.requests_file = f"mexc_requests_{self.timestamp}.json"
self.cookies_file = f"mexc_cookies_{self.timestamp}.json"
def setup_chrome_with_logging(self) -> webdriver.Chrome:
"""Setup Chrome with performance logging enabled"""
logger.info("Setting up ChromeDriver with request interception...")
# Chrome options
chrome_options = Options()
if self.headless:
chrome_options.add_argument("--headless")
logger.info("Running in headless mode")
# Essential options for automation
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--disable-web-security")
chrome_options.add_argument("--allow-running-insecure-content")
chrome_options.add_argument("--disable-features=VizDisplayCompositor")
# User agent to avoid detection
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"
chrome_options.add_argument(f"--user-agent={user_agent}")
# Disable automation flags
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# Enable performance logging for network requests
chrome_options.add_argument("--enable-logging")
chrome_options.add_argument("--log-level=0")
chrome_options.add_argument("--v=1")
# Set capabilities for performance logging
caps = DesiredCapabilities.CHROME
caps['goog:loggingPrefs'] = {
'performance': 'ALL',
'browser': 'ALL'
}
try:
# Automatically download and install ChromeDriver
logger.info("Downloading/updating ChromeDriver...")
service = Service(ChromeDriverManager().install())
# Create driver
driver = webdriver.Chrome(
service=service,
options=chrome_options,
desired_capabilities=caps
)
# Hide automation indicators
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
driver.execute_cdp_cmd('Network.setUserAgentOverride', {
"userAgent": user_agent
})
# Enable network domain for CDP
driver.execute_cdp_cmd('Network.enable', {})
driver.execute_cdp_cmd('Runtime.enable', {})
logger.info("ChromeDriver setup complete!")
return driver
except Exception as e:
logger.error(f"Failed to setup ChromeDriver: {e}")
raise
def start_monitoring(self):
"""Start the browser and begin monitoring"""
logger.info("Starting MEXC Request Interceptor...")
try:
# Setup ChromeDriver
self.driver = self.setup_chrome_with_logging()
# Navigate to MEXC futures
mexc_url = "https://www.mexc.com/en-GB/futures/ETH_USDT?type=linear_swap"
logger.info(f"Navigating to: {mexc_url}")
self.driver.get(mexc_url)
# Wait for page load
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
logger.info("✅ MEXC page loaded successfully!")
logger.info("📝 Please log in manually in the browser window")
logger.info("🔍 Request monitoring is now active...")
# Start monitoring in background thread
self.monitoring = True
monitor_thread = threading.Thread(target=self._monitor_requests, daemon=True)
monitor_thread.start()
# Wait for manual login
self._wait_for_login()
return True
except Exception as e:
logger.error(f"Failed to start monitoring: {e}")
return False
def _wait_for_login(self):
"""Wait for user to log in and show interactive menu"""
logger.info("\n" + "="*60)
logger.info("MEXC REQUEST INTERCEPTOR - INTERACTIVE MODE")
logger.info("="*60)
while True:
print("\nOptions:")
print("1. Check login status")
print("2. Extract current cookies")
print("3. Show captured requests summary")
print("4. Save captured data to files")
print("5. Perform test trade (manual)")
print("6. Monitor for 60 seconds")
print("0. Stop and exit")
choice = input("\nEnter choice (0-6): ").strip()
if choice == "1":
self._check_login_status()
elif choice == "2":
self._extract_cookies()
elif choice == "3":
self._show_requests_summary()
elif choice == "4":
self._save_all_data()
elif choice == "5":
self._guide_test_trade()
elif choice == "6":
self._monitor_for_duration(60)
elif choice == "0":
break
else:
print("Invalid choice. Please try again.")
self.stop_monitoring()
def _check_login_status(self):
"""Check if user is logged into MEXC"""
try:
cookies = self.driver.get_cookies()
auth_cookies = ['uc_token', 'u_id', 'x-mxc-fingerprint']
found_auth = []
for cookie in cookies:
if cookie['name'] in auth_cookies and cookie['value']:
found_auth.append(cookie['name'])
if len(found_auth) >= 2:
print("✅ LOGIN DETECTED - You appear to be logged in!")
print(f" Found auth cookies: {', '.join(found_auth)}")
return True
else:
print("❌ NOT LOGGED IN - Please log in to MEXC in the browser")
print(" Missing required authentication cookies")
return False
except Exception as e:
print(f"❌ Error checking login: {e}")
return False
def _extract_cookies(self):
"""Extract and display current session cookies"""
try:
cookies = self.driver.get_cookies()
cookie_dict = {}
for cookie in cookies:
cookie_dict[cookie['name']] = cookie['value']
self.session_cookies = cookie_dict
print(f"\n📊 Extracted {len(cookie_dict)} cookies:")
# Show important cookies
important = ['uc_token', 'u_id', 'x-mxc-fingerprint', 'mexc_fingerprint_visitorId']
for name in important:
if name in cookie_dict:
value = cookie_dict[name]
display_value = value[:20] + "..." if len(value) > 20 else value
print(f"{name}: {display_value}")
else:
print(f"{name}: Missing")
# Save cookies to file
if self.save_to_file:
with open(self.cookies_file, 'w') as f:
json.dump(cookie_dict, f, indent=2)
print(f"\n💾 Cookies saved to: {self.cookies_file}")
except Exception as e:
print(f"❌ Error extracting cookies: {e}")
def _monitor_requests(self):
"""Background thread to monitor network requests"""
last_log_count = 0
while self.monitoring:
try:
# Get performance logs
logs = self.driver.get_log('performance')
for log in logs:
try:
message = json.loads(log['message'])
method = message.get('message', {}).get('method', '')
# Capture network requests
if method == 'Network.requestWillBeSent':
self._process_request(message['message']['params'])
elif method == 'Network.responseReceived':
self._process_response(message['message']['params'])
except (json.JSONDecodeError, KeyError) as e:
continue
# Show progress every 10 new requests
if len(self.captured_requests) >= last_log_count + 10:
last_log_count = len(self.captured_requests)
logger.info(f"📈 Captured {len(self.captured_requests)} requests, {len(self.captured_responses)} responses")
except Exception as e:
if self.monitoring: # Only log if we're still supposed to be monitoring
logger.debug(f"Monitor error: {e}")
time.sleep(0.5) # Check every 500ms
def _process_request(self, request_data):
"""Process a captured network request"""
try:
url = request_data.get('request', {}).get('url', '')
# Filter for MEXC API requests
if self._is_mexc_request(url):
request_info = {
'type': 'request',
'timestamp': datetime.now().isoformat(),
'url': url,
'method': request_data.get('request', {}).get('method', ''),
'headers': request_data.get('request', {}).get('headers', {}),
'postData': request_data.get('request', {}).get('postData', ''),
'requestId': request_data.get('requestId', '')
}
self.captured_requests.append(request_info)
# Show important requests immediately
if ('futures.mexc.com' in url or 'captcha' in url):
print(f"\n🚀 CAPTURED REQUEST: {request_info['method']} {url}")
if request_info['postData']:
print(f" 📄 POST Data: {request_info['postData'][:100]}...")
except Exception as e:
logger.debug(f"Error processing request: {e}")
def _process_response(self, response_data):
"""Process a captured network response"""
try:
url = response_data.get('response', {}).get('url', '')
# Filter for MEXC API responses
if self._is_mexc_request(url):
response_info = {
'type': 'response',
'timestamp': datetime.now().isoformat(),
'url': url,
'status': response_data.get('response', {}).get('status', 0),
'headers': response_data.get('response', {}).get('headers', {}),
'requestId': response_data.get('requestId', '')
}
self.captured_responses.append(response_info)
# Show important responses immediately
if ('futures.mexc.com' in url or 'captcha' in url):
status = response_info['status']
status_emoji = "" if status == 200 else ""
print(f" {status_emoji} RESPONSE: {status} for {url}")
except Exception as e:
logger.debug(f"Error processing response: {e}")
def _is_mexc_request(self, url: str) -> bool:
"""Check if URL is a relevant MEXC API request"""
mexc_indicators = [
'futures.mexc.com',
'ucgateway/captcha_api',
'api/v1/private',
'api/v3/order',
'mexc.com/api'
]
return any(indicator in url for indicator in mexc_indicators)
def _show_requests_summary(self):
"""Show summary of captured requests"""
print(f"\n📊 CAPTURE SUMMARY:")
print(f" Total Requests: {len(self.captured_requests)}")
print(f" Total Responses: {len(self.captured_responses)}")
# Group by URL pattern
url_counts = {}
for req in self.captured_requests:
base_url = req['url'].split('?')[0] # Remove query params
url_counts[base_url] = url_counts.get(base_url, 0) + 1
print("\n🔗 Top URLs:")
for url, count in sorted(url_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" {count}x {url}")
# Show recent futures API calls
futures_requests = [r for r in self.captured_requests if 'futures.mexc.com' in r['url']]
if futures_requests:
print(f"\n🚀 Futures API Calls: {len(futures_requests)}")
for req in futures_requests[-3:]: # Show last 3
print(f" {req['method']} {req['url']}")
def _save_all_data(self):
"""Save all captured data to files"""
if not self.save_to_file:
print("File saving is disabled")
return
try:
# Save requests
with open(self.requests_file, 'w') as f:
json.dump({
'requests': self.captured_requests,
'responses': self.captured_responses,
'summary': {
'total_requests': len(self.captured_requests),
'total_responses': len(self.captured_responses),
'capture_session': self.timestamp
}
}, f, indent=2)
# Save cookies if we have them
if self.session_cookies:
with open(self.cookies_file, 'w') as f:
json.dump(self.session_cookies, f, indent=2)
print(f"\n💾 Data saved to:")
print(f" 📋 Requests: {self.requests_file}")
if self.session_cookies:
print(f" 🍪 Cookies: {self.cookies_file}")
except Exception as e:
print(f"❌ Error saving data: {e}")
def _guide_test_trade(self):
"""Guide user through performing a test trade"""
print("\n🧪 TEST TRADE GUIDE:")
print("1. Make sure you're logged into MEXC")
print("2. Go to the trading interface")
print("3. Try to place a SMALL test trade (it may fail, but we'll capture the requests)")
print("4. Watch the console for captured API calls")
print("\n⚠️ IMPORTANT: Use very small amounts for testing!")
input("\nPress Enter when you're ready to start monitoring...")
self._monitor_for_duration(120) # Monitor for 2 minutes
def _monitor_for_duration(self, seconds: int):
"""Monitor requests for a specific duration"""
print(f"\n🔍 Monitoring requests for {seconds} seconds...")
print("Perform your trading actions now!")
start_time = time.time()
initial_count = len(self.captured_requests)
while time.time() - start_time < seconds:
current_count = len(self.captured_requests)
new_requests = current_count - initial_count
remaining = seconds - int(time.time() - start_time)
print(f"\r⏱️ Time remaining: {remaining}s | New requests: {new_requests}", end="", flush=True)
time.sleep(1)
final_count = len(self.captured_requests)
new_total = final_count - initial_count
print(f"\n✅ Monitoring complete! Captured {new_total} new requests")
def stop_monitoring(self):
"""Stop monitoring and close browser"""
logger.info("Stopping request monitoring...")
self.monitoring = False
if self.driver:
self.driver.quit()
logger.info("Browser closed")
# Final save
if self.save_to_file and (self.captured_requests or self.captured_responses):
self._save_all_data()
logger.info("Final data save complete")
def main():
"""Main function to run the interceptor"""
print("🚀 MEXC Request Interceptor with ChromeDriver")
print("=" * 50)
print("This will automatically:")
print("✅ Download/setup ChromeDriver")
print("✅ Open MEXC futures page")
print("✅ Capture all API requests/responses")
print("✅ Extract session cookies")
print("✅ Save data to JSON files")
print("\nPress Ctrl+C to stop at any time")
# Ask for preferences
headless = input("\nRun in headless mode? (y/n): ").lower().strip() == 'y'
interceptor = MEXCRequestInterceptor(headless=headless, save_to_file=True)
try:
success = interceptor.start_monitoring()
if not success:
print("❌ Failed to start monitoring")
return
except KeyboardInterrupt:
print("\n\n⏹️ Stopping interceptor...")
except Exception as e:
print(f"\n❌ Error: {e}")
finally:
interceptor.stop_monitoring()
print("\n👋 Goodbye!")
if __name__ == "__main__":
main()