BIG CLEANUP

This commit is contained in:
Dobromir Popov
2025-08-08 14:58:55 +03:00
parent e39e9ee95a
commit 2b0d2679c6
162 changed files with 455 additions and 42814 deletions

View File

@ -1,8 +0,0 @@
# MEXC Web Client Module
#
# This module provides web-based trading capabilities for MEXC futures trading
# which is not supported by their official API.
from .mexc_futures_client import MEXCFuturesWebClient
__all__ = ['MEXCFuturesWebClient']

View File

@ -1,555 +0,0 @@
#!/usr/bin/env python3
"""
MEXC Auto Browser with Request Interception
This script automatically spawns a ChromeDriver instance and captures
all MEXC futures trading requests in real-time, including full request
and response data needed for reverse engineering.
"""
import logging
import time
import json
import sys
import os
from typing import Dict, List, Optional, Any
from datetime import datetime
import threading
import queue
# Selenium imports
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
from webdriver_manager.chrome import ChromeDriverManager
except ImportError:
print("Please install selenium and webdriver-manager:")
print("pip install selenium webdriver-manager")
sys.exit(1)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MEXCRequestInterceptor:
"""
Automatically spawns ChromeDriver and intercepts all MEXC API requests
"""
def __init__(self, headless: bool = False, save_to_file: bool = True):
"""
Initialize the request interceptor
Args:
headless: Run browser in headless mode
save_to_file: Save captured requests to JSON file
"""
self.driver = None
self.headless = headless
self.save_to_file = save_to_file
self.captured_requests = []
self.captured_responses = []
self.session_cookies = {}
self.monitoring = False
self.request_queue = queue.Queue()
# File paths for saving data
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.requests_file = f"mexc_requests_{self.timestamp}.json"
self.cookies_file = f"mexc_cookies_{self.timestamp}.json"
def setup_browser(self):
"""Setup Chrome browser with necessary options"""
chrome_options = webdriver.ChromeOptions()
# Enable headless mode if needed
if self.headless:
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--disable-extensions')
# Set up Chrome options with a user data directory to persist session
user_data_base_dir = os.path.join(os.getcwd(), 'chrome_user_data')
os.makedirs(user_data_base_dir, exist_ok=True)
# Check for existing session directories
session_dirs = [d for d in os.listdir(user_data_base_dir) if d.startswith('session_')]
session_dirs.sort(reverse=True) # Sort descending to get the most recent first
user_data_dir = None
if session_dirs:
use_existing = input(f"Found {len(session_dirs)} existing sessions. Use an existing session? (y/n): ").lower().strip() == 'y'
if use_existing:
print("Available sessions:")
for i, session in enumerate(session_dirs[:5], 1): # Show up to 5 most recent
print(f"{i}. {session}")
choice = input("Enter session number (default 1) or any other key for most recent: ")
if choice.isdigit() and 1 <= int(choice) <= len(session_dirs):
selected_session = session_dirs[int(choice) - 1]
else:
selected_session = session_dirs[0]
user_data_dir = os.path.join(user_data_base_dir, selected_session)
print(f"Using session: {selected_session}")
if user_data_dir is None:
user_data_dir = os.path.join(user_data_base_dir, f'session_{self.timestamp}')
os.makedirs(user_data_dir, exist_ok=True)
print(f"Creating new session: session_{self.timestamp}")
chrome_options.add_argument(f'--user-data-dir={user_data_dir}')
# Enable logging to capture JS console output and network activity
chrome_options.set_capability('goog:loggingPrefs', {
'browser': 'ALL',
'performance': 'ALL'
})
try:
self.driver = webdriver.Chrome(options=chrome_options)
except Exception as e:
print(f"Failed to start browser with session: {e}")
print("Falling back to a new session...")
user_data_dir = os.path.join(user_data_base_dir, f'session_{self.timestamp}_fallback')
os.makedirs(user_data_dir, exist_ok=True)
print(f"Creating fallback session: session_{self.timestamp}_fallback")
chrome_options = webdriver.ChromeOptions()
if self.headless:
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument('--disable-extensions')
chrome_options.add_argument(f'--user-data-dir={user_data_dir}')
chrome_options.set_capability('goog:loggingPrefs', {
'browser': 'ALL',
'performance': 'ALL'
})
self.driver = webdriver.Chrome(options=chrome_options)
return self.driver
def start_monitoring(self):
"""Start the browser and begin monitoring"""
logger.info("Starting MEXC Request Interceptor...")
try:
# Setup ChromeDriver
self.driver = self.setup_browser()
# Navigate to MEXC futures
mexc_url = "https://www.mexc.com/en-GB/futures/ETH_USDT?type=linear_swap"
logger.info(f"Navigating to: {mexc_url}")
self.driver.get(mexc_url)
# Wait for page load
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
logger.info("✅ MEXC page loaded successfully!")
logger.info("📝 Please log in manually in the browser window")
logger.info("🔍 Request monitoring is now active...")
# Start monitoring in background thread
self.monitoring = True
monitor_thread = threading.Thread(target=self._monitor_requests, daemon=True)
monitor_thread.start()
# Wait for manual login
self._wait_for_login()
return True
except Exception as e:
logger.error(f"Failed to start monitoring: {e}")
return False
def _wait_for_login(self):
"""Wait for user to log in and show interactive menu"""
logger.info("\n" + "="*60)
logger.info("MEXC REQUEST INTERCEPTOR - INTERACTIVE MODE")
logger.info("="*60)
while True:
print("\nOptions:")
print("1. Check login status")
print("2. Extract current cookies")
print("3. Show captured requests summary")
print("4. Save captured data to files")
print("5. Perform test trade (manual)")
print("6. Monitor for 60 seconds")
print("0. Stop and exit")
choice = input("\nEnter choice (0-6): ").strip()
if choice == "1":
self._check_login_status()
elif choice == "2":
self._extract_cookies()
elif choice == "3":
self._show_requests_summary()
elif choice == "4":
self._save_all_data()
elif choice == "5":
self._guide_test_trade()
elif choice == "6":
self._monitor_for_duration(60)
elif choice == "0":
break
else:
print("Invalid choice. Please try again.")
self.stop_monitoring()
def _check_login_status(self):
"""Check if user is logged into MEXC"""
try:
cookies = self.driver.get_cookies()
auth_cookies = ['uc_token', 'u_id', 'x-mxc-fingerprint']
found_auth = []
for cookie in cookies:
if cookie['name'] in auth_cookies and cookie['value']:
found_auth.append(cookie['name'])
if len(found_auth) >= 2:
print("✅ LOGIN DETECTED - You appear to be logged in!")
print(f" Found auth cookies: {', '.join(found_auth)}")
return True
else:
print("❌ NOT LOGGED IN - Please log in to MEXC in the browser")
print(" Missing required authentication cookies")
return False
except Exception as e:
print(f"❌ Error checking login: {e}")
return False
def _extract_cookies(self):
"""Extract and display current session cookies"""
try:
cookies = self.driver.get_cookies()
cookie_dict = {}
for cookie in cookies:
cookie_dict[cookie['name']] = cookie['value']
self.session_cookies = cookie_dict
print(f"\n📊 Extracted {len(cookie_dict)} cookies:")
# Show important cookies
important = ['uc_token', 'u_id', 'x-mxc-fingerprint', 'mexc_fingerprint_visitorId']
for name in important:
if name in cookie_dict:
value = cookie_dict[name]
display_value = value[:20] + "..." if len(value) > 20 else value
print(f"{name}: {display_value}")
else:
print(f"{name}: Missing")
# Save cookies to file
if self.save_to_file:
with open(self.cookies_file, 'w') as f:
json.dump(cookie_dict, f, indent=2)
print(f"\n💾 Cookies saved to: {self.cookies_file}")
except Exception as e:
print(f"❌ Error extracting cookies: {e}")
def _monitor_requests(self):
"""Background thread to monitor network requests"""
last_log_count = 0
while self.monitoring:
try:
# Get performance logs
logs = self.driver.get_log('performance')
for log in logs:
try:
message = json.loads(log['message'])
method = message.get('message', {}).get('method', '')
# Capture network requests
if method == 'Network.requestWillBeSent':
self._process_request(message['message']['params'])
elif method == 'Network.responseReceived':
self._process_response(message['message']['params'])
except (json.JSONDecodeError, KeyError) as e:
continue
# Show progress every 10 new requests
if len(self.captured_requests) >= last_log_count + 10:
last_log_count = len(self.captured_requests)
logger.info(f"📈 Captured {len(self.captured_requests)} requests, {len(self.captured_responses)} responses")
except Exception as e:
if self.monitoring: # Only log if we're still supposed to be monitoring
logger.debug(f"Monitor error: {e}")
time.sleep(0.5) # Check every 500ms
def _process_request(self, request_data):
"""Process a captured network request"""
try:
url = request_data.get('request', {}).get('url', '')
# Filter for MEXC API requests
if self._is_mexc_request(url):
request_info = {
'type': 'request',
'timestamp': datetime.now().isoformat(),
'url': url,
'method': request_data.get('request', {}).get('method', ''),
'headers': request_data.get('request', {}).get('headers', {}),
'postData': request_data.get('request', {}).get('postData', ''),
'requestId': request_data.get('requestId', '')
}
self.captured_requests.append(request_info)
# Show important requests immediately
if ('futures.mexc.com' in url or 'captcha' in url):
print(f"\n🚀 CAPTURED REQUEST: {request_info['method']} {url}")
if request_info['postData']:
print(f" 📄 POST Data: {request_info['postData'][:100]}...")
# Enhanced captcha detection and detailed logging
if 'captcha' in url.lower() or 'robot' in url.lower():
logger.info(f"CAPTCHA REQUEST DETECTED: {request_data.get('request', {}).get('method', 'UNKNOWN')} {url}")
logger.info(f" Headers: {request_data.get('request', {}).get('headers', {})}")
if request_data.get('request', {}).get('postData', ''):
logger.info(f" Data: {request_data.get('request', {}).get('postData', '')}")
# Attempt to capture related JavaScript or DOM elements (if possible)
if self.driver is not None:
try:
js_snippet = self.driver.execute_script("return document.querySelector('script[src*=\"captcha\"]') ? document.querySelector('script[src*=\"captcha\"]').outerHTML : 'No captcha script found';")
logger.info(f" Related JS Snippet: {js_snippet}")
except Exception as e:
logger.warning(f" Could not capture JS snippet: {e}")
try:
dom_element = self.driver.execute_script("return document.querySelector('div[id*=\"captcha\"]') ? document.querySelector('div[id*=\"captcha\"]').outerHTML : 'No captcha element found';")
logger.info(f" Related DOM Element: {dom_element}")
except Exception as e:
logger.warning(f" Could not capture DOM element: {e}")
else:
logger.warning(" Driver not initialized, cannot capture JS or DOM elements")
except Exception as e:
logger.debug(f"Error processing request: {e}")
def _process_response(self, response_data):
"""Process a captured network response"""
try:
url = response_data.get('response', {}).get('url', '')
# Filter for MEXC API responses
if self._is_mexc_request(url):
response_info = {
'type': 'response',
'timestamp': datetime.now().isoformat(),
'url': url,
'status': response_data.get('response', {}).get('status', 0),
'headers': response_data.get('response', {}).get('headers', {}),
'requestId': response_data.get('requestId', '')
}
self.captured_responses.append(response_info)
# Show important responses immediately
if ('futures.mexc.com' in url or 'captcha' in url):
status = response_info['status']
status_emoji = "" if status == 200 else ""
print(f" {status_emoji} RESPONSE: {status} for {url}")
except Exception as e:
logger.debug(f"Error processing response: {e}")
def _is_mexc_request(self, url: str) -> bool:
"""Check if URL is a relevant MEXC API request"""
mexc_indicators = [
'futures.mexc.com',
'ucgateway/captcha_api',
'api/v1/private',
'api/v3/order',
'mexc.com/api'
]
return any(indicator in url for indicator in mexc_indicators)
def _show_requests_summary(self):
"""Show summary of captured requests"""
print(f"\n📊 CAPTURE SUMMARY:")
print(f" Total Requests: {len(self.captured_requests)}")
print(f" Total Responses: {len(self.captured_responses)}")
# Group by URL pattern
url_counts = {}
for req in self.captured_requests:
base_url = req['url'].split('?')[0] # Remove query params
url_counts[base_url] = url_counts.get(base_url, 0) + 1
print("\n🔗 Top URLs:")
for url, count in sorted(url_counts.items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" {count}x {url}")
# Show recent futures API calls
futures_requests = [r for r in self.captured_requests if 'futures.mexc.com' in r['url']]
if futures_requests:
print(f"\n🚀 Futures API Calls: {len(futures_requests)}")
for req in futures_requests[-3:]: # Show last 3
print(f" {req['method']} {req['url']}")
def _save_all_data(self):
"""Save all captured data to files"""
if not self.save_to_file:
print("File saving is disabled")
return
try:
# Save requests
with open(self.requests_file, 'w') as f:
json.dump({
'requests': self.captured_requests,
'responses': self.captured_responses,
'summary': {
'total_requests': len(self.captured_requests),
'total_responses': len(self.captured_responses),
'capture_session': self.timestamp
}
}, f, indent=2)
# Save cookies if we have them
if self.session_cookies:
with open(self.cookies_file, 'w') as f:
json.dump(self.session_cookies, f, indent=2)
print(f"\n💾 Data saved to:")
print(f" 📋 Requests: {self.requests_file}")
if self.session_cookies:
print(f" 🍪 Cookies: {self.cookies_file}")
# Extract and save CAPTCHA tokens from captured requests
captcha_tokens = self.extract_captcha_tokens()
if captcha_tokens:
captcha_file = f"mexc_captcha_tokens_{self.timestamp}.json"
with open(captcha_file, 'w') as f:
json.dump(captcha_tokens, f, indent=2)
logger.info(f"Saved CAPTCHA tokens to {captcha_file}")
else:
logger.warning("No CAPTCHA tokens found in captured requests")
except Exception as e:
print(f"❌ Error saving data: {e}")
def _guide_test_trade(self):
"""Guide user through performing a test trade"""
print("\n🧪 TEST TRADE GUIDE:")
print("1. Make sure you're logged into MEXC")
print("2. Go to the trading interface")
print("3. Try to place a SMALL test trade (it may fail, but we'll capture the requests)")
print("4. Watch the console for captured API calls")
print("\n⚠️ IMPORTANT: Use very small amounts for testing!")
input("\nPress Enter when you're ready to start monitoring...")
self._monitor_for_duration(120) # Monitor for 2 minutes
def _monitor_for_duration(self, seconds: int):
"""Monitor requests for a specific duration"""
print(f"\n🔍 Monitoring requests for {seconds} seconds...")
print("Perform your trading actions now!")
start_time = time.time()
initial_count = len(self.captured_requests)
while time.time() - start_time < seconds:
current_count = len(self.captured_requests)
new_requests = current_count - initial_count
remaining = seconds - int(time.time() - start_time)
print(f"\r⏱️ Time remaining: {remaining}s | New requests: {new_requests}", end="", flush=True)
time.sleep(1)
final_count = len(self.captured_requests)
new_total = final_count - initial_count
print(f"\n✅ Monitoring complete! Captured {new_total} new requests")
def stop_monitoring(self):
"""Stop monitoring and close browser"""
logger.info("Stopping request monitoring...")
self.monitoring = False
if self.driver:
self.driver.quit()
logger.info("Browser closed")
# Final save
if self.save_to_file and (self.captured_requests or self.captured_responses):
self._save_all_data()
logger.info("Final data save complete")
def extract_captcha_tokens(self):
"""Extract CAPTCHA tokens from captured requests"""
captcha_tokens = []
for request in self.captured_requests:
if 'captcha-token' in request.get('headers', {}):
token = request['headers']['captcha-token']
captcha_tokens.append({
'token': token,
'url': request.get('url', ''),
'timestamp': request.get('timestamp', '')
})
elif 'captcha' in request.get('url', '').lower():
response = request.get('response', {})
if response and 'captcha-token' in response.get('headers', {}):
token = response['headers']['captcha-token']
captcha_tokens.append({
'token': token,
'url': request.get('url', ''),
'timestamp': request.get('timestamp', '')
})
return captcha_tokens
def main():
"""Main function to run the interceptor"""
print("🚀 MEXC Request Interceptor with ChromeDriver")
print("=" * 50)
print("This will automatically:")
print("✅ Download/setup ChromeDriver")
print("✅ Open MEXC futures page")
print("✅ Capture all API requests/responses")
print("✅ Extract session cookies")
print("✅ Save data to JSON files")
print("\nPress Ctrl+C to stop at any time")
# Ask for preferences
headless = input("\nRun in headless mode? (y/n): ").lower().strip() == 'y'
interceptor = MEXCRequestInterceptor(headless=headless, save_to_file=True)
try:
success = interceptor.start_monitoring()
if not success:
print("❌ Failed to start monitoring")
return
except KeyboardInterrupt:
print("\n\n⏹️ Stopping interceptor...")
except Exception as e:
print(f"\n❌ Error: {e}")
finally:
interceptor.stop_monitoring()
print("\n👋 Goodbye!")
if __name__ == "__main__":
main()

View File

@ -1,358 +0,0 @@
"""
MEXC Browser Automation for Cookie Extraction and Request Monitoring
This module uses Selenium to automate browser interactions and extract
session cookies and request data for MEXC futures trading.
"""
import logging
import time
import json
from typing import Dict, List, Optional, Any
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
logger = logging.getLogger(__name__)
class MEXCBrowserAutomation:
"""
Browser automation for MEXC futures trading session management
"""
def __init__(self, headless: bool = False, proxy: Optional[str] = None):
"""
Initialize browser automation
Args:
headless: Run browser in headless mode
proxy: HTTP proxy to use (format: host:port)
"""
self.driver = None
self.headless = headless
self.proxy = proxy
self.logged_in = False
def setup_chrome_driver(self) -> webdriver.Chrome:
"""Setup Chrome driver with appropriate options"""
chrome_options = Options()
if self.headless:
chrome_options.add_argument("--headless")
# Basic Chrome options for automation
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
# Set user agent to avoid detection
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36")
# Proxy setup if provided
if self.proxy:
chrome_options.add_argument(f"--proxy-server=http://{self.proxy}")
# Enable network logging
chrome_options.add_argument("--enable-logging")
chrome_options.add_argument("--log-level=0")
chrome_options.set_capability("goog:loggingPrefs", {"performance": "ALL"})
# Automatically download and setup ChromeDriver
service = Service(ChromeDriverManager().install())
try:
driver = webdriver.Chrome(service=service, options=chrome_options)
# Execute script to avoid detection
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
except WebDriverException as e:
logger.error(f"Failed to setup Chrome driver: {e}")
raise
def start_browser(self):
"""Start the browser session"""
if self.driver is None:
logger.info("Starting Chrome browser for MEXC automation")
self.driver = self.setup_chrome_driver()
logger.info("Browser started successfully")
def stop_browser(self):
"""Stop the browser session"""
if self.driver:
logger.info("Stopping browser")
self.driver.quit()
self.driver = None
def navigate_to_mexc_futures(self, symbol: str = "ETH_USDT"):
"""
Navigate to MEXC futures trading page
Args:
symbol: Trading symbol to navigate to
"""
if not self.driver:
self.start_browser()
url = f"https://www.mexc.com/en-GB/futures/{symbol}?type=linear_swap"
logger.info(f"Navigating to MEXC futures: {url}")
self.driver.get(url)
# Wait for page to load
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
logger.info("MEXC futures page loaded")
except TimeoutException:
logger.error("Timeout waiting for MEXC page to load")
def wait_for_login(self, timeout: int = 300) -> bool:
"""
Wait for user to manually log in to MEXC
Args:
timeout: Maximum time to wait for login (seconds)
Returns:
bool: True if login detected, False if timeout
"""
logger.info("Please log in to MEXC manually in the browser window")
logger.info("Waiting for login completion...")
start_time = time.time()
while time.time() - start_time < timeout:
# Check if we can find elements that indicate logged in state
try:
# Look for user-specific elements that appear after login
cookies = self.driver.get_cookies()
# Check for authentication cookies
auth_cookies = ['uc_token', 'u_id']
logged_in_indicators = 0
for cookie in cookies:
if cookie['name'] in auth_cookies and cookie['value']:
logged_in_indicators += 1
if logged_in_indicators >= 2:
logger.info("Login detected!")
self.logged_in = True
return True
except Exception as e:
logger.debug(f"Error checking login status: {e}")
time.sleep(2) # Check every 2 seconds
logger.error(f"Login timeout after {timeout} seconds")
return False
def extract_session_cookies(self) -> Dict[str, str]:
"""
Extract all cookies from current browser session
Returns:
Dictionary of cookie name-value pairs
"""
if not self.driver:
logger.error("Browser not started")
return {}
cookies = {}
try:
browser_cookies = self.driver.get_cookies()
for cookie in browser_cookies:
cookies[cookie['name']] = cookie['value']
logger.info(f"Extracted {len(cookies)} cookies from browser session")
# Log important cookies (without values for security)
important_cookies = ['uc_token', 'u_id', 'x-mxc-fingerprint', 'mexc_fingerprint_visitorId']
for cookie_name in important_cookies:
if cookie_name in cookies:
logger.info(f"Found important cookie: {cookie_name}")
else:
logger.warning(f"Missing important cookie: {cookie_name}")
return cookies
except Exception as e:
logger.error(f"Failed to extract cookies: {e}")
return {}
def monitor_network_requests(self, duration: int = 60) -> List[Dict[str, Any]]:
"""
Monitor network requests for the specified duration
Args:
duration: How long to monitor requests (seconds)
Returns:
List of captured network requests
"""
if not self.driver:
logger.error("Browser not started")
return []
logger.info(f"Starting network monitoring for {duration} seconds")
logger.info("Please perform trading actions in the browser (open/close positions)")
start_time = time.time()
captured_requests = []
while time.time() - start_time < duration:
try:
# Get performance logs (network requests)
logs = self.driver.get_log('performance')
for log in logs:
message = json.loads(log['message'])
# Filter for relevant MEXC API requests
if (message.get('message', {}).get('method') == 'Network.responseReceived'):
response = message['message']['params']['response']
url = response.get('url', '')
# Look for futures API calls
if ('futures.mexc.com' in url or
'ucgateway/captcha_api' in url or
'api/v1/private' in url):
request_data = {
'url': url,
'method': response.get('mimeType', ''),
'status': response.get('status'),
'headers': response.get('headers', {}),
'timestamp': log['timestamp']
}
captured_requests.append(request_data)
logger.info(f"Captured request: {url}")
except Exception as e:
logger.debug(f"Error in network monitoring: {e}")
time.sleep(1)
logger.info(f"Network monitoring complete. Captured {len(captured_requests)} requests")
return captured_requests
def perform_test_trade(self, symbol: str = "ETH_USDT", volume: float = 1.0, leverage: int = 200):
"""
Attempt to perform a test trade to capture the complete request flow
Args:
symbol: Trading symbol
volume: Position size
leverage: Leverage multiplier
"""
if not self.logged_in:
logger.error("Not logged in - cannot perform test trade")
return
logger.info(f"Attempting test trade: {symbol}, Volume: {volume}, Leverage: {leverage}x")
logger.info("This will attempt to click trading interface elements")
try:
# This would need to be implemented based on MEXC's specific UI elements
# For now, just wait and let user perform manual actions
logger.info("Please manually place a small test trade while monitoring is active")
time.sleep(30)
except Exception as e:
logger.error(f"Error during test trade: {e}")
def full_session_capture(self, symbol: str = "ETH_USDT") -> Dict[str, Any]:
"""
Complete session capture workflow
Args:
symbol: Trading symbol to use
Returns:
Dictionary containing cookies and captured requests
"""
logger.info("Starting full MEXC session capture")
try:
# Start browser and navigate to MEXC
self.navigate_to_mexc_futures(symbol)
# Wait for manual login
if not self.wait_for_login():
return {'success': False, 'error': 'Login timeout'}
# Extract session cookies
cookies = self.extract_session_cookies()
if not cookies:
return {'success': False, 'error': 'Failed to extract cookies'}
# Monitor network requests while user performs actions
logger.info("Starting network monitoring - please perform trading actions now")
requests = self.monitor_network_requests(duration=120) # 2 minutes
return {
'success': True,
'cookies': cookies,
'network_requests': requests,
'timestamp': int(time.time())
}
except Exception as e:
logger.error(f"Error in session capture: {e}")
return {'success': False, 'error': str(e)}
finally:
self.stop_browser()
def main():
"""Main function for standalone execution"""
logging.basicConfig(level=logging.INFO)
print("MEXC Browser Automation - Session Capture")
print("This will open a browser window for you to log into MEXC")
print("Make sure you have Chrome browser installed")
automation = MEXCBrowserAutomation(headless=False)
try:
result = automation.full_session_capture()
if result['success']:
print(f"\nSession capture successful!")
print(f"Extracted {len(result['cookies'])} cookies")
print(f"Captured {len(result['network_requests'])} network requests")
# Save results to file
output_file = f"mexc_session_capture_{int(time.time())}.json"
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
print(f"Results saved to: {output_file}")
else:
print(f"Session capture failed: {result['error']}")
except KeyboardInterrupt:
print("\nSession capture interrupted by user")
except Exception as e:
print(f"Error: {e}")
finally:
automation.stop_browser()
if __name__ == "__main__":
main()

View File

@ -1,525 +0,0 @@
"""
MEXC Futures Web Client
This module implements a web-based client for MEXC futures trading
since their official API doesn't support futures (leverage) trading.
It mimics browser behavior by replicating the exact HTTP requests
that the web interface makes.
"""
import logging
import requests
import time
import json
import hmac
import hashlib
import base64
from typing import Dict, List, Optional, Any
from datetime import datetime
import uuid
from urllib.parse import urlencode
import glob
import os
logger = logging.getLogger(__name__)
class MEXCSessionManager:
def __init__(self):
self.captcha_token = None
def get_captcha_token(self) -> str:
return self.captcha_token if self.captcha_token else ""
def save_captcha_token(self, token: str):
self.captcha_token = token
logger.info("MEXC: Captcha token saved in session manager")
class MEXCFuturesWebClient:
"""
MEXC Futures Web Client that mimics browser behavior for futures trading.
Since MEXC's official API doesn't support futures, this client replicates
the exact HTTP requests made by their web interface.
"""
def __init__(self, api_key: str, api_secret: str, user_id: str, base_url: str = 'https://www.mexc.com', headless: bool = True):
"""
Initialize the MEXC Futures Web Client
Args:
api_key: API key for authentication
api_secret: API secret for authentication
user_id: User ID for authentication
base_url: Base URL for the MEXC website
headless: Whether to run the browser in headless mode
"""
self.api_key = api_key
self.api_secret = api_secret
self.user_id = user_id
self.base_url = base_url
self.is_authenticated = False
self.headless = headless
self.session = requests.Session()
self.session_manager = MEXCSessionManager() # Adding session_manager attribute
self.captcha_url = f'{base_url}/ucgateway/captcha_api'
self.futures_api_url = "https://futures.mexc.com/api/v1"
# Setup default headers that mimic a real browser
self.setup_browser_headers()
def setup_browser_headers(self):
"""Setup default headers that mimic Chrome browser"""
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36',
'Accept': '*/*',
'Accept-Language': 'en-GB,en-US;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'sec-ch-ua': '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'Cache-Control': 'no-cache',
'Pragma': 'no-cache',
'Referer': f'{self.base_url}/en-GB/futures/ETH_USDT?type=linear_swap',
'Language': 'English',
'X-Language': 'en-GB',
'trochilus-trace-id': f"{uuid.uuid4()}-{int(time.time() * 1000) % 10000:04d}",
'trochilus-uid': str(self.user_id) if self.user_id is not None else ''
})
def load_session_cookies(self, cookies: Dict[str, str]):
"""
Load session cookies from browser
Args:
cookies: Dictionary of cookie name-value pairs
"""
for name, value in cookies.items():
self.session.cookies.set(name, value)
# Extract important session info from cookies
self.auth_token = cookies.get('uc_token')
self.user_id = cookies.get('u_id')
self.fingerprint = cookies.get('x-mxc-fingerprint')
self.visitor_id = cookies.get('mexc_fingerprint_visitorId')
if self.auth_token and self.user_id:
self.is_authenticated = True
logger.info("MEXC: Loaded authenticated session")
else:
logger.warning("MEXC: Session cookies incomplete - authentication may fail")
def extract_cookies_from_browser(self, cookie_string: str) -> Dict[str, str]:
"""
Extract cookies from a browser cookie string
Args:
cookie_string: Raw cookie string from browser (copy from Network tab)
Returns:
Dictionary of parsed cookies
"""
cookies = {}
cookie_pairs = cookie_string.split(';')
for pair in cookie_pairs:
if '=' in pair:
name, value = pair.strip().split('=', 1)
cookies[name] = value
return cookies
def verify_captcha(self, symbol: str, side: str, leverage: str) -> bool:
"""
Verify captcha for robot trading protection
Args:
symbol: Trading symbol (e.g., 'ETH_USDT')
side: 'openlong', 'closelong', 'openshort', 'closeshort'
leverage: Leverage string (e.g., '200X')
Returns:
bool: True if captcha verification successful
"""
if not self.is_authenticated:
logger.error("MEXC: Cannot verify captcha - not authenticated")
return False
# Build captcha endpoint URL
endpoint = f"robot.future.{side}.{symbol}.{leverage}"
url = f"{self.captcha_url}/{endpoint}"
# Attempt to get captcha token from session manager
captcha_token = self.session_manager.get_captcha_token()
if not captcha_token:
logger.warning("MEXC: No captcha token available, attempting to fetch from browser")
captcha_token = self._extract_captcha_token_from_browser()
if captcha_token:
self.session_manager.save_captcha_token(captcha_token)
else:
logger.error("MEXC: Failed to extract captcha token from browser")
return False
headers = {
'Content-Type': 'application/json',
'Language': 'en-GB',
'Referer': f'{self.base_url}/en-GB/futures/{symbol}?type=linear_swap',
'trochilus-uid': self.user_id if self.user_id else '',
'trochilus-trace-id': f"{uuid.uuid4()}-{int(time.time() * 1000) % 10000:04d}",
'captcha-token': captcha_token
}
logger.info(f"MEXC: Verifying captcha for {endpoint}")
try:
response = self.session.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info(f"MEXC: Captcha verified successfully for {endpoint}")
return True
else:
logger.error(f"MEXC: Captcha verification failed for {endpoint}: {data}")
return False
else:
logger.error(f"MEXC: Captcha verification request failed with status {response.status_code}: {response.text}")
return False
except Exception as e:
logger.error(f"MEXC: Captcha verification error for {endpoint}: {str(e)}")
return False
def _extract_captcha_token_from_browser(self) -> str:
"""
Extract captcha token from browser session using stored cookies or requests.
This method looks for the most recent mexc_captcha_tokens JSON file to retrieve a token.
"""
try:
# Look for the most recent mexc_captcha_tokens file
captcha_files = glob.glob("mexc_captcha_tokens_*.json")
if not captcha_files:
logger.error("MEXC: No CAPTCHA token files found")
return ""
# Sort files by timestamp (most recent first)
latest_file = max(captcha_files, key=os.path.getctime)
logger.info(f"MEXC: Using CAPTCHA token file {latest_file}")
with open(latest_file, 'r') as f:
captcha_data = json.load(f)
if captcha_data and isinstance(captcha_data, list) and len(captcha_data) > 0:
# Return the most recent token
return captcha_data[0].get('token', '')
else:
logger.error("MEXC: No valid CAPTCHA tokens found in file")
return ""
except Exception as e:
logger.error(f"MEXC: Error extracting captcha token from browser data: {str(e)}")
return ""
def generate_signature(self, method: str, path: str, params: Dict[str, Any],
timestamp: int, nonce: int) -> str:
"""
Generate signature for MEXC futures API requests
This is reverse-engineered from the browser requests
"""
# This is a placeholder - the actual signature generation would need
# to be reverse-engineered from the browser's JavaScript
# For now, return empty string and rely on cookie authentication
return ""
def open_long_position(self, symbol: str, volume: float, leverage: int = 200,
price: Optional[float] = None) -> Dict[str, Any]:
"""
Open a long futures position
Args:
symbol: Trading symbol (e.g., 'ETH_USDT')
volume: Position size (contracts)
leverage: Leverage multiplier (default 200)
price: Limit price (None for market order)
Returns:
dict: Order response with order ID
"""
if not self.is_authenticated:
logger.error("MEXC: Cannot open position - not authenticated")
return {'success': False, 'error': 'Not authenticated'}
# First verify captcha
if not self.verify_captcha(symbol, 'openlong', f'{leverage}X'):
logger.error("MEXC: Captcha verification failed for opening long position")
return {'success': False, 'error': 'Captcha verification failed'}
# Prepare order parameters based on the request dump
timestamp = int(time.time() * 1000)
nonce = timestamp
order_data = {
'symbol': symbol,
'side': 1, # 1 = long, 2 = short
'openType': 2, # Open position
'type': '5', # Market order (might be '1' for limit)
'vol': volume,
'leverage': leverage,
'marketCeiling': False,
'priceProtect': '0',
'ts': timestamp,
'mhash': self._generate_mhash(), # This needs to be implemented
'mtoken': self.visitor_id
}
# Add price for limit orders
if price is not None:
order_data['price'] = price
order_data['type'] = '1' # Limit order
# Add encrypted parameters (these would need proper implementation)
order_data['p0'] = self._encrypt_p0(order_data) # Placeholder
order_data['k0'] = self._encrypt_k0(order_data) # Placeholder
order_data['chash'] = self._generate_chash(order_data) # Placeholder
# Setup headers for the order request
headers = {
'Authorization': self.auth_token,
'Content-Type': 'application/json',
'Language': 'English',
'x-language': 'en-GB',
'x-mxc-nonce': str(nonce),
'x-mxc-sign': self.generate_signature('POST', '/private/order/create', order_data, timestamp, nonce),
'trochilus-uid': self.user_id,
'trochilus-trace-id': f"{uuid.uuid4()}-{int(time.time() * 1000) % 10000:04d}",
'Referer': 'https://www.mexc.com/'
}
# Make the order request
url = f"{self.futures_api_url}/private/order/create"
try:
# First make OPTIONS request (preflight)
options_response = self.session.options(url, headers=headers, timeout=10)
if options_response.status_code == 200:
# Now make the actual POST request
response = self.session.post(url, json=order_data, headers=headers, timeout=15)
if response.status_code == 200:
data = response.json()
if data.get('success') and data.get('code') == 0:
order_id = data.get('data', {}).get('orderId')
logger.info(f"MEXC: Long position opened successfully - Order ID: {order_id}")
return {
'success': True,
'order_id': order_id,
'timestamp': data.get('data', {}).get('ts'),
'symbol': symbol,
'side': 'long',
'volume': volume,
'leverage': leverage
}
else:
logger.error(f"MEXC: Order failed: {data}")
return {'success': False, 'error': data.get('msg', 'Unknown error')}
else:
logger.error(f"MEXC: Order request failed with status {response.status_code}")
return {'success': False, 'error': f'HTTP {response.status_code}'}
else:
logger.error(f"MEXC: OPTIONS preflight failed with status {options_response.status_code}")
return {'success': False, 'error': f'Preflight failed: HTTP {options_response.status_code}'}
except Exception as e:
logger.error(f"MEXC: Order execution error: {e}")
return {'success': False, 'error': str(e)}
def close_long_position(self, symbol: str, volume: float, leverage: int = 200,
price: Optional[float] = None) -> Dict[str, Any]:
"""
Close a long futures position
Args:
symbol: Trading symbol (e.g., 'ETH_USDT')
volume: Position size to close (contracts)
leverage: Leverage multiplier
price: Limit price (None for market order)
Returns:
dict: Order response
"""
if not self.is_authenticated:
logger.error("MEXC: Cannot close position - not authenticated")
return {'success': False, 'error': 'Not authenticated'}
# First verify captcha
if not self.verify_captcha(symbol, 'closelong', f'{leverage}X'):
logger.error("MEXC: Captcha verification failed for closing long position")
return {'success': False, 'error': 'Captcha verification failed'}
# Similar to open_long_position but with closeType instead of openType
timestamp = int(time.time() * 1000)
nonce = timestamp
order_data = {
'symbol': symbol,
'side': 2, # Close side is opposite
'closeType': 1, # Close position
'type': '5', # Market order
'vol': volume,
'leverage': leverage,
'marketCeiling': False,
'priceProtect': '0',
'ts': timestamp,
'mhash': self._generate_mhash(),
'mtoken': self.visitor_id
}
if price is not None:
order_data['price'] = price
order_data['type'] = '1'
order_data['p0'] = self._encrypt_p0(order_data)
order_data['k0'] = self._encrypt_k0(order_data)
order_data['chash'] = self._generate_chash(order_data)
return self._execute_order(order_data, 'close_long')
def open_short_position(self, symbol: str, volume: float, leverage: int = 200,
price: Optional[float] = None) -> Dict[str, Any]:
"""Open a short futures position"""
if not self.verify_captcha(symbol, 'openshort', f'{leverage}X'):
return {'success': False, 'error': 'Captcha verification failed'}
order_data = {
'symbol': symbol,
'side': 2, # 2 = short
'openType': 2,
'type': '5',
'vol': volume,
'leverage': leverage,
'marketCeiling': False,
'priceProtect': '0',
'ts': int(time.time() * 1000),
'mhash': self._generate_mhash(),
'mtoken': self.visitor_id
}
if price is not None:
order_data['price'] = price
order_data['type'] = '1'
order_data['p0'] = self._encrypt_p0(order_data)
order_data['k0'] = self._encrypt_k0(order_data)
order_data['chash'] = self._generate_chash(order_data)
return self._execute_order(order_data, 'open_short')
def close_short_position(self, symbol: str, volume: float, leverage: int = 200,
price: Optional[float] = None) -> Dict[str, Any]:
"""Close a short futures position"""
if not self.verify_captcha(symbol, 'closeshort', f'{leverage}X'):
return {'success': False, 'error': 'Captcha verification failed'}
order_data = {
'symbol': symbol,
'side': 1, # Close side is opposite
'closeType': 1,
'type': '5',
'vol': volume,
'leverage': leverage,
'marketCeiling': False,
'priceProtect': '0',
'ts': int(time.time() * 1000),
'mhash': self._generate_mhash(),
'mtoken': self.visitor_id
}
if price is not None:
order_data['price'] = price
order_data['type'] = '1'
order_data['p0'] = self._encrypt_p0(order_data)
order_data['k0'] = self._encrypt_k0(order_data)
order_data['chash'] = self._generate_chash(order_data)
return self._execute_order(order_data, 'close_short')
def _execute_order(self, order_data: Dict[str, Any], action: str) -> Dict[str, Any]:
"""Common order execution logic"""
timestamp = order_data['ts']
nonce = timestamp
headers = {
'Authorization': self.auth_token,
'Content-Type': 'application/json',
'Language': 'English',
'x-language': 'en-GB',
'x-mxc-nonce': str(nonce),
'x-mxc-sign': self.generate_signature('POST', '/private/order/create', order_data, timestamp, nonce),
'trochilus-uid': self.user_id,
'trochilus-trace-id': f"{uuid.uuid4()}-{int(time.time() * 1000) % 10000:04d}",
'Referer': 'https://www.mexc.com/'
}
url = f"{self.futures_api_url}/private/order/create"
try:
response = self.session.post(url, json=order_data, headers=headers, timeout=15)
if response.status_code == 200:
data = response.json()
if data.get('success') and data.get('code') == 0:
order_id = data.get('data', {}).get('orderId')
logger.info(f"MEXC: {action} executed successfully - Order ID: {order_id}")
return {
'success': True,
'order_id': order_id,
'timestamp': data.get('data', {}).get('ts'),
'action': action
}
else:
logger.error(f"MEXC: {action} failed: {data}")
return {'success': False, 'error': data.get('msg', 'Unknown error')}
else:
logger.error(f"MEXC: {action} request failed with status {response.status_code}")
return {'success': False, 'error': f'HTTP {response.status_code}'}
except Exception as e:
logger.error(f"MEXC: {action} execution error: {e}")
return {'success': False, 'error': str(e)}
# Placeholder methods for encryption/hashing - these need proper implementation
def _generate_mhash(self) -> str:
"""Generate mhash parameter (needs reverse engineering)"""
return "a0015441fd4c3b6ba427b894b76cb7dd" # Placeholder from request dump
def _encrypt_p0(self, order_data: Dict[str, Any]) -> str:
"""Encrypt p0 parameter (needs reverse engineering)"""
return "placeholder_p0_encryption" # This needs proper implementation
def _encrypt_k0(self, order_data: Dict[str, Any]) -> str:
"""Encrypt k0 parameter (needs reverse engineering)"""
return "placeholder_k0_encryption" # This needs proper implementation
def _generate_chash(self, order_data: Dict[str, Any]) -> str:
"""Generate chash parameter (needs reverse engineering)"""
return "d6c64d28e362f314071b3f9d78ff7494d9cd7177ae0465e772d1840e9f7905d8" # Placeholder
def get_account_info(self) -> Dict[str, Any]:
"""Get account information including positions and balances"""
if not self.is_authenticated:
return {'success': False, 'error': 'Not authenticated'}
# This would need to be implemented by reverse engineering the account info endpoints
logger.info("MEXC: Account info endpoint not yet implemented")
return {'success': False, 'error': 'Not implemented'}
def get_open_positions(self) -> List[Dict[str, Any]]:
"""Get list of open futures positions"""
if not self.is_authenticated:
return []
# This would need to be implemented by reverse engineering the positions endpoint
logger.info("MEXC: Open positions endpoint not yet implemented")
return []

View File

@ -1,259 +0,0 @@
"""
MEXC Session Manager
Helper utilities for managing MEXC web sessions and extracting cookies from browser.
"""
import logging
import json
import re
from typing import Dict, Optional, Any
from pathlib import Path
logger = logging.getLogger(__name__)
class MEXCSessionManager:
"""
Helper class for managing MEXC web sessions and extracting browser cookies
"""
def __init__(self):
self.session_file = Path("mexc_session.json")
def extract_cookies_from_network_tab(self, cookie_header: str) -> Dict[str, str]:
"""
Extract cookies from browser Network tab cookie header
Args:
cookie_header: Raw cookie string from browser (copy from Request Headers)
Returns:
Dictionary of parsed cookies
"""
cookies = {}
# Remove 'Cookie: ' prefix if present
if cookie_header.startswith('Cookie: '):
cookie_header = cookie_header[8:]
elif cookie_header.startswith('cookie: '):
cookie_header = cookie_header[8:]
# Split by semicolon and parse each cookie
cookie_pairs = cookie_header.split(';')
for pair in cookie_pairs:
pair = pair.strip()
if '=' in pair:
name, value = pair.split('=', 1)
cookies[name.strip()] = value.strip()
logger.info(f"Extracted {len(cookies)} cookies from browser")
return cookies
def validate_session_cookies(self, cookies: Dict[str, str]) -> bool:
"""
Validate that essential cookies are present for authentication
Args:
cookies: Dictionary of cookie name-value pairs
Returns:
bool: True if cookies appear valid for authentication
"""
required_cookies = [
'uc_token', # User authentication token
'u_id', # User ID
'x-mxc-fingerprint', # Browser fingerprint
'mexc_fingerprint_visitorId' # Visitor ID
]
missing_cookies = []
for cookie_name in required_cookies:
if cookie_name not in cookies or not cookies[cookie_name]:
missing_cookies.append(cookie_name)
if missing_cookies:
logger.warning(f"Missing required cookies: {missing_cookies}")
return False
logger.info("All required cookies are present")
return True
def save_session(self, cookies: Dict[str, str], metadata: Optional[Dict[str, Any]] = None):
"""
Save session cookies to file for reuse
Args:
cookies: Dictionary of cookies to save
metadata: Optional metadata about the session
"""
session_data = {
'cookies': cookies,
'metadata': metadata or {},
'timestamp': int(time.time())
}
try:
with open(self.session_file, 'w') as f:
json.dump(session_data, f, indent=2)
logger.info(f"Session saved to {self.session_file}")
except Exception as e:
logger.error(f"Failed to save session: {e}")
def load_session(self) -> Optional[Dict[str, str]]:
"""
Load session cookies from file
Returns:
Dictionary of cookies if successful, None otherwise
"""
if not self.session_file.exists():
logger.info("No saved session found")
return None
try:
with open(self.session_file, 'r') as f:
session_data = json.load(f)
cookies = session_data.get('cookies', {})
timestamp = session_data.get('timestamp', 0)
# Check if session is too old (24 hours)
import time
if time.time() - timestamp > 24 * 3600:
logger.warning("Saved session is too old (>24h), may be expired")
if self.validate_session_cookies(cookies):
logger.info("Loaded valid session from file")
return cookies
else:
logger.warning("Loaded session has invalid cookies")
return None
except Exception as e:
logger.error(f"Failed to load session: {e}")
return None
def extract_from_curl_command(self, curl_command: str) -> Dict[str, str]:
"""
Extract cookies from a curl command copied from browser
Args:
curl_command: Complete curl command from browser "Copy as cURL"
Returns:
Dictionary of extracted cookies
"""
cookies = {}
# Find cookie header in curl command
cookie_match = re.search(r'-H [\'"]cookie: ([^\'"]+)[\'"]', curl_command, re.IGNORECASE)
if not cookie_match:
cookie_match = re.search(r'--header [\'"]cookie: ([^\'"]+)[\'"]', curl_command, re.IGNORECASE)
if cookie_match:
cookie_header = cookie_match.group(1)
cookies = self.extract_cookies_from_network_tab(cookie_header)
logger.info(f"Extracted {len(cookies)} cookies from curl command")
else:
logger.warning("No cookie header found in curl command")
return cookies
def print_cookie_extraction_guide(self):
"""Print instructions for extracting cookies from browser"""
print("\n" + "="*80)
print("MEXC COOKIE EXTRACTION GUIDE")
print("="*80)
print("""
To extract cookies from your browser for MEXC futures trading:
METHOD 1: Browser Network Tab
1. Open MEXC futures page and log in: https://www.mexc.com/en-GB/futures/ETH_USDT
2. Open browser Developer Tools (F12)
3. Go to Network tab
4. Try to place a small futures trade (it will fail, but we need the request)
5. Find the request to 'futures.mexc.com' in the Network tab
6. Right-click on the request -> Copy -> Copy request headers
7. Find the 'Cookie:' line and copy everything after 'Cookie: '
METHOD 2: Copy as cURL
1. Follow steps 1-5 above
2. Right-click on the futures API request -> Copy -> Copy as cURL
3. Paste the entire cURL command
METHOD 3: Manual Cookie Extraction
1. While logged into MEXC, press F12 -> Application/Storage tab
2. On the left, expand 'Cookies' -> click on 'https://www.mexc.com'
3. Copy the values for these important cookies:
- uc_token
- u_id
- x-mxc-fingerprint
- mexc_fingerprint_visitorId
IMPORTANT NOTES:
- Cookies expire after some time (usually 24 hours)
- You must be logged into MEXC futures (not just spot trading)
- Keep your cookies secure - they provide access to your account
- Test with small amounts first
Example usage:
session_manager = MEXCSessionManager()
# Method 1: From cookie header
cookie_header = "uc_token=ABC123; u_id=DEF456; ..."
cookies = session_manager.extract_cookies_from_network_tab(cookie_header)
# Method 2: From cURL command
curl_cmd = "curl 'https://futures.mexc.com/...' -H 'cookie: uc_token=ABC123...'"
cookies = session_manager.extract_from_curl_command(curl_cmd)
# Save session for reuse
session_manager.save_session(cookies)
""")
print("="*80)
if __name__ == "__main__":
# When run directly, show the extraction guide
import time
manager = MEXCSessionManager()
manager.print_cookie_extraction_guide()
print("\nWould you like to:")
print("1. Load saved session")
print("2. Extract cookies from clipboard")
print("3. Exit")
choice = input("\nEnter choice (1-3): ").strip()
if choice == "1":
cookies = manager.load_session()
if cookies:
print(f"\nLoaded {len(cookies)} cookies from saved session")
if manager.validate_session_cookies(cookies):
print("Session appears valid for trading")
else:
print("Warning: Session may be incomplete or expired")
else:
print("No valid saved session found")
elif choice == "2":
print("\nPaste your cookie header or cURL command:")
user_input = input().strip()
if user_input.startswith('curl'):
cookies = manager.extract_from_curl_command(user_input)
else:
cookies = manager.extract_cookies_from_network_tab(user_input)
if cookies and manager.validate_session_cookies(cookies):
print(f"\nSuccessfully extracted {len(cookies)} valid cookies")
save = input("Save session for reuse? (y/n): ").strip().lower()
if save == 'y':
manager.save_session(cookies)
else:
print("Failed to extract valid cookies")
else:
print("Goodbye!")

View File

@ -1,346 +0,0 @@
#!/usr/bin/env python3
"""
Test MEXC Futures Web Client
This script demonstrates how to use the MEXC Futures Web Client
for futures trading that isn't supported by their official API.
IMPORTANT: This requires extracting cookies from your browser session.
"""
import logging
import sys
import os
import time
import json
import uuid
# Add the project root to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from mexc_futures_client import MEXCFuturesWebClient
from session_manager import MEXCSessionManager
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Constants
SYMBOL = "ETH_USDT"
LEVERAGE = 300
CREDENTIALS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mexc_credentials.json')
# Read credentials from mexc_credentials.json in JSON format
def load_credentials():
credentials_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'mexc_credentials.json')
cookies = {}
captcha_token_open = ''
captcha_token_close = ''
try:
with open(credentials_file, 'r') as f:
data = json.load(f)
cookies = data.get('credentials', {}).get('cookies', {})
captcha_token_open = data.get('credentials', {}).get('captcha_token_open', '')
captcha_token_close = data.get('credentials', {}).get('captcha_token_close', '')
logger.info(f"Loaded credentials from {credentials_file}")
except Exception as e:
logger.error(f"Error loading credentials: {e}")
return cookies, captcha_token_open, captcha_token_close
def test_basic_connection():
"""Test basic connection and authentication"""
logger.info("Testing MEXC Futures Web Client")
# Initialize session manager
session_manager = MEXCSessionManager()
# Try to load saved session first
cookies = session_manager.load_session()
if not cookies:
# Explicitly load the cookies from the file we have
cookies_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'mexc_cookies_20250703_003625.json')
if os.path.exists(cookies_file):
try:
with open(cookies_file, 'r') as f:
cookies = json.load(f)
logger.info(f"Loaded cookies from {cookies_file}")
except Exception as e:
logger.error(f"Failed to load cookies from {cookies_file}: {e}")
cookies = None
else:
logger.error(f"Cookies file not found at {cookies_file}")
cookies = None
if not cookies:
print("\nNo saved session found. You need to extract cookies from your browser.")
session_manager.print_cookie_extraction_guide()
print("\nPaste your cookie header or cURL command (or press Enter to exit):")
user_input = input().strip()
if not user_input:
print("No input provided. Exiting.")
return False
# Extract cookies from user input
if user_input.startswith('curl'):
cookies = session_manager.extract_from_curl_command(user_input)
else:
cookies = session_manager.extract_cookies_from_network_tab(user_input)
if not cookies:
logger.error("Failed to extract cookies from input")
return False
# Validate and save session
if session_manager.validate_session_cookies(cookies):
session_manager.save_session(cookies)
logger.info("Session saved for future use")
else:
logger.warning("Extracted cookies may be incomplete")
# Initialize the web client
client = MEXCFuturesWebClient(api_key='', api_secret='', user_id='', base_url='https://www.mexc.com', headless=True)
# Load cookies into the client's session
for name, value in cookies.items():
client.session.cookies.set(name, value)
# Update headers to include additional parameters from captured requests
client.session.headers.update({
'trochilus-trace-id': f"{uuid.uuid4()}-{int(time.time() * 1000) % 10000:04d}",
'trochilus-uid': cookies.get('u_id', ''),
'Referer': 'https://www.mexc.com/en-GB/futures/ETH_USDT?type=linear_swap',
'Language': 'English',
'X-Language': 'en-GB'
})
if not client.is_authenticated:
logger.error("Failed to authenticate with extracted cookies")
return False
logger.info("Successfully authenticated with MEXC")
logger.info(f"User ID: {client.user_id}")
logger.info(f"Auth Token: {client.auth_token[:20]}..." if client.auth_token else "No auth token")
return True
def test_captcha_verification(client: MEXCFuturesWebClient):
"""Test captcha verification system"""
logger.info("Testing captcha verification...")
# Test captcha for ETH_USDT long position with 200x leverage
success = client.verify_captcha('ETH_USDT', 'openlong', '200X')
if success:
logger.info("Captcha verification successful")
else:
logger.warning("Captcha verification failed - this may be normal if no position is being opened")
return success
def test_position_opening(client: MEXCFuturesWebClient, dry_run: bool = True):
"""Test opening a position (dry run by default)"""
if dry_run:
logger.info("DRY RUN: Testing position opening (no actual trade)")
else:
logger.warning("LIVE TRADING: Opening actual position!")
symbol = 'ETH_USDT'
volume = 1 # Small test position
leverage = 200
logger.info(f"Attempting to open long position: {symbol}, Volume: {volume}, Leverage: {leverage}x")
if not dry_run:
result = client.open_long_position(symbol, volume, leverage)
if result['success']:
logger.info(f"Position opened successfully!")
logger.info(f"Order ID: {result['order_id']}")
logger.info(f"Timestamp: {result['timestamp']}")
return True
else:
logger.error(f"Failed to open position: {result['error']}")
return False
else:
logger.info("DRY RUN: Would attempt to open position here")
# Test just the captcha verification part
return client.verify_captcha(symbol, 'openlong', f'{leverage}X')
def test_position_opening_live(client):
symbol = "ETH_USDT"
volume = 1 # Small volume for testing
leverage = 200
logger.info(f"LIVE TRADING: Opening actual position!")
logger.info(f"Attempting to open long position: {symbol}, Volume: {volume}, Leverage: {leverage}x")
result = client.open_long_position(symbol, volume, leverage)
if result.get('success'):
logger.info(f"Successfully opened position: {result}")
else:
logger.error(f"Failed to open position: {result.get('error', 'Unknown error')}")
def interactive_menu(client: MEXCFuturesWebClient):
"""Interactive menu for testing different functions"""
while True:
print("\n" + "="*50)
print("MEXC Futures Web Client Test Menu")
print("="*50)
print("1. Test captcha verification")
print("2. Test position opening (DRY RUN)")
print("3. Test position opening (LIVE - BE CAREFUL!)")
print("4. Test position closing (DRY RUN)")
print("5. Show session info")
print("6. Refresh session")
print("0. Exit")
choice = input("\nEnter choice (0-6): ").strip()
if choice == "1":
test_captcha_verification(client)
elif choice == "2":
test_position_opening(client, dry_run=True)
elif choice == "3":
test_position_opening_live(client)
elif choice == "4":
logger.info("DRY RUN: Position closing test")
success = client.verify_captcha('ETH_USDT', 'closelong', '200X')
if success:
logger.info("DRY RUN: Would close position here")
else:
logger.warning("Captcha verification failed for position closing")
elif choice == "5":
print(f"\nSession Information:")
print(f"Authenticated: {client.is_authenticated}")
print(f"User ID: {client.user_id}")
print(f"Auth Token: {client.auth_token[:20]}..." if client.auth_token else "None")
print(f"Fingerprint: {client.fingerprint}")
print(f"Visitor ID: {client.visitor_id}")
elif choice == "6":
session_manager = MEXCSessionManager()
session_manager.print_cookie_extraction_guide()
elif choice == "0":
print("Goodbye!")
break
else:
print("Invalid choice. Please try again.")
def main():
"""Main test function"""
print("MEXC Futures Web Client Test")
print("WARNING: This is experimental software for futures trading")
print("Use at your own risk and test with small amounts first!")
# Load cookies and tokens
cookies, captcha_token_open, captcha_token_close = load_credentials()
if not cookies:
logger.error("Failed to load cookies from credentials file")
sys.exit(1)
# Initialize client with loaded cookies and tokens
client = MEXCFuturesWebClient(api_key='', api_secret='', user_id='')
# Load cookies into the client's session
for name, value in cookies.items():
client.session.cookies.set(name, value)
# Set captcha tokens
client.captcha_token_open = captcha_token_open
client.captcha_token_close = captcha_token_close
# Try to load credentials from the new JSON file
try:
with open(CREDENTIALS_FILE, 'r') as f:
credentials_data = json.load(f)
cookies = credentials_data['credentials']['cookies']
captcha_token_open = credentials_data['credentials']['captcha_token_open']
captcha_token_close = credentials_data['credentials']['captcha_token_close']
client.load_session_cookies(cookies)
client.session_manager.save_captcha_token(captcha_token_open) # Assuming this is for opening
except FileNotFoundError:
logger.error(f"Credentials file not found at {CREDENTIALS_FILE}")
return False
except json.JSONDecodeError as e:
logger.error(f"Error loading credentials: {e}")
return False
except KeyError as e:
logger.error(f"Missing key in credentials file: {e}")
return False
if not client.is_authenticated:
logger.error("Client not authenticated. Please ensure valid cookies and tokens are in mexc_credentials.json")
return False
# Test connection and authentication
logger.info("Successfully authenticated with MEXC")
# Set leverage
leverage_response = client.update_leverage(symbol=SYMBOL, leverage=LEVERAGE)
if leverage_response and leverage_response.get('code') == 200:
logger.info(f"Leverage set to {LEVERAGE}x for {SYMBOL}")
else:
logger.error(f"Failed to set leverage: {leverage_response}")
sys.exit(1)
# Get current price
ticker = client.get_ticker_data(symbol=SYMBOL)
if ticker and ticker.get('code') == 200:
current_price = float(ticker['data']['last'])
logger.info(f"Current {SYMBOL} price: {current_price}")
else:
logger.error(f"Failed to get ticker data: {ticker}")
sys.exit(1)
# Calculate order size for a small test trade (e.g., $10 worth)
trade_usdt = 10.0
order_qty = round((trade_usdt / current_price) * LEVERAGE, 3)
logger.info(f"Calculated order quantity: {order_qty} {SYMBOL} for ~${trade_usdt} at {LEVERAGE}x")
# Test 1: Open LONG position
logger.info(f"Opening LONG position for {SYMBOL} at {current_price} with qty {order_qty}")
open_long_order = client.create_order(
symbol=SYMBOL,
side=1, # 1 for BUY
position_side=1, # 1 for LONG
order_type=1, # 1 for LIMIT
price=current_price,
vol=order_qty
)
if open_long_order and open_long_order.get('code') == 200:
logger.info(f"✅ Successfully opened LONG position: {open_long_order['data']}")
else:
logger.error(f"❌ Failed to open LONG position: {open_long_order}")
sys.exit(1)
# Test 2: Close LONG position
logger.info(f"Closing LONG position for {SYMBOL}")
close_long_order = client.create_order(
symbol=SYMBOL,
side=2, # 2 for SELL
position_side=1, # 1 for LONG
order_type=1, # 1 for LIMIT
price=current_price,
vol=order_qty,
reduce_only=True
)
if close_long_order and close_long_order.get('code') == 200:
logger.info(f"✅ Successfully closed LONG position: {close_long_order['data']}")
else:
logger.error(f"❌ Failed to close LONG position: {close_long_order}")
sys.exit(1)
logger.info("All tests completed successfully!")
if __name__ == "__main__":
main()