gogo2/crypto/gogo2/mexc_tick_stream.py
2025-03-17 01:56:07 +02:00

240 lines
9.9 KiB
Python

import os
import json
import asyncio
import logging
import datetime
import numpy as np
import pandas as pd
import websockets
from dotenv import load_dotenv
from torch.utils.tensorboard import SummaryWriter
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("mexc_tick_stream.log"), logging.StreamHandler()]
)
logger = logging.getLogger("mexc_tick_stream")
# Load environment variables
load_dotenv()
MEXC_API_KEY = os.getenv('MEXC_API_KEY')
MEXC_SECRET_KEY = os.getenv('MEXC_SECRET_KEY')
class MexcTickStreamer:
def __init__(self, symbol="ETH/USDT", update_interval=1.0):
"""
Initialize the MEXC tick data streamer
Args:
symbol: Trading pair symbol (e.g., "ETH/USDT")
update_interval: How often to update the TensorBoard visualization (in seconds)
"""
self.symbol = symbol.replace("/", "").upper() # Convert to MEXC format (e.g., ETHUSDT)
self.update_interval = update_interval
self.uri = "wss://wbs-api.mexc.com/ws"
self.writer = SummaryWriter(f'runs/mexc_ticks_{self.symbol}')
self.trades = []
self.last_update_time = 0
self.running = False
# For visualization
self.price_history = []
self.volume_history = []
self.buy_volume = 0
self.sell_volume = 0
self.step = 0
async def connect(self):
"""Connect to MEXC WebSocket and subscribe to tick data"""
try:
self.websocket = await websockets.connect(self.uri)
logger.info(f"Connected to MEXC WebSocket for {self.symbol}")
# Subscribe to trade stream (using non-protobuf endpoint for simplicity)
subscribe_msg = {
"method": "SUBSCRIPTION",
"params": [f"spot@public.deals.v3.api@{self.symbol}"]
}
await self.websocket.send(json.dumps(subscribe_msg))
logger.info(f"Subscribed to {self.symbol} tick data")
# Start ping task to keep connection alive
asyncio.create_task(self.ping_loop())
return True
except Exception as e:
logger.error(f"Error connecting to MEXC WebSocket: {e}")
return False
async def ping_loop(self):
"""Send ping messages to keep the connection alive"""
while self.running:
try:
await self.websocket.send(json.dumps({"method": "PING"}))
await asyncio.sleep(30) # Send ping every 30 seconds
except Exception as e:
logger.error(f"Error in ping loop: {e}")
break
async def process_message(self, message):
"""Process incoming WebSocket messages"""
try:
# Try to parse as JSON
try:
data = json.loads(message)
# Handle PONG response
if data.get("msg") == "PONG":
return
# Handle subscription confirmation
if data.get("code") == 0:
logger.info(f"Subscription confirmed: {data.get('msg')}")
return
# Handle trade data in the non-protobuf format
if "c" in data and "d" in data and "deals" in data["d"]:
for trade in data["d"]["deals"]:
# Extract trade data
price = float(trade["p"])
quantity = float(trade["v"])
trade_type = 1 if trade["S"] == 1 else 2 # 1 for buy, 2 for sell
timestamp = trade["t"]
# Store trade data
self.trades.append({
"price": price,
"quantity": quantity,
"type": "buy" if trade_type == 1 else "sell",
"timestamp": timestamp
})
# Update volume counters
if trade_type == 1: # Buy
self.buy_volume += quantity
else: # Sell
self.sell_volume += quantity
# Store for visualization
self.price_history.append(price)
self.volume_history.append(quantity)
# Limit history size to prevent memory issues
if len(self.price_history) > 10000:
self.price_history = self.price_history[-5000:]
self.volume_history = self.volume_history[-5000:]
# Update TensorBoard if enough time has passed
current_time = datetime.datetime.now().timestamp()
if current_time - self.last_update_time >= self.update_interval:
await self.update_tensorboard()
self.last_update_time = current_time
except json.JSONDecodeError:
# If it's not valid JSON, it might be binary protobuf data
logger.debug("Received binary data, skipping (protobuf not implemented)")
except Exception as e:
logger.error(f"Error processing message: {e}")
async def update_tensorboard(self):
"""Update TensorBoard visualizations"""
try:
if not self.price_history:
return
# Calculate metrics
current_price = self.price_history[-1]
avg_price = np.mean(self.price_history[-100:]) if len(self.price_history) >= 100 else np.mean(self.price_history)
price_std = np.std(self.price_history[-100:]) if len(self.price_history) >= 100 else np.std(self.price_history)
# Calculate VWAP (Volume Weighted Average Price)
if len(self.price_history) >= 100 and len(self.volume_history) >= 100:
vwap = np.sum(np.array(self.price_history[-100:]) * np.array(self.volume_history[-100:])) / np.sum(self.volume_history[-100:])
else:
vwap = np.sum(np.array(self.price_history) * np.array(self.volume_history)) / np.sum(self.volume_history) if np.sum(self.volume_history) > 0 else current_price
# Calculate buy/sell ratio
total_volume = self.buy_volume + self.sell_volume
buy_ratio = self.buy_volume / total_volume if total_volume > 0 else 0.5
# Log to TensorBoard
self.writer.add_scalar('Price/Current', current_price, self.step)
self.writer.add_scalar('Price/VWAP', vwap, self.step)
self.writer.add_scalar('Price/StdDev', price_std, self.step)
self.writer.add_scalar('Volume/BuyRatio', buy_ratio, self.step)
self.writer.add_scalar('Volume/Total', total_volume, self.step)
# Create a candlestick-like chart for the last 100 ticks
if len(self.price_history) >= 100:
prices = np.array(self.price_history[-100:])
self.writer.add_histogram('Price/Distribution', prices, self.step)
# Create a custom scalars panel
layout = {
"Price": {
"Current vs VWAP": ["Multiline", ["Price/Current", "Price/VWAP"]],
},
"Volume": {
"Buy Ratio": ["Multiline", ["Volume/BuyRatio"]],
}
}
self.writer.add_custom_scalars(layout)
self.step += 1
logger.info(f"Updated TensorBoard: Price={current_price:.2f}, VWAP={vwap:.2f}, Buy Ratio={buy_ratio:.2f}")
except Exception as e:
logger.error(f"Error updating TensorBoard: {e}")
async def run(self):
"""Main loop to receive and process WebSocket messages"""
self.running = True
self.last_update_time = datetime.datetime.now().timestamp()
if not await self.connect():
logger.error("Failed to connect. Exiting.")
return
try:
while self.running:
message = await self.websocket.recv()
await self.process_message(message)
except websockets.exceptions.ConnectionClosed:
logger.warning("WebSocket connection closed")
except Exception as e:
logger.error(f"Error in run loop: {e}")
finally:
self.running = False
await self.cleanup()
async def cleanup(self):
"""Clean up resources"""
try:
if hasattr(self, 'websocket'):
await self.websocket.close()
self.writer.close()
logger.info("Cleaned up resources")
except Exception as e:
logger.error(f"Error during cleanup: {e}")
async def main():
"""Main entry point"""
# Parse command line arguments
import argparse
parser = argparse.ArgumentParser(description='MEXC Tick Data Streamer')
parser.add_argument('--symbol', type=str, default='ETH/USDT', help='Trading pair symbol (e.g., ETH/USDT)')
parser.add_argument('--interval', type=float, default=1.0, help='TensorBoard update interval in seconds')
args = parser.parse_args()
# Create and run the streamer
streamer = MexcTickStreamer(symbol=args.symbol, update_interval=args.interval)
await streamer.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Program interrupted by user")
except Exception as e:
logger.error(f"Unhandled exception: {e}")