gogo2/crypto/gogo/data/live_data.py
2025-02-12 01:27:38 +02:00

177 lines
7.0 KiB
Python

# data/live_data.py
import asyncio
import json
import os
import time
from collections import deque
import ccxt.async_support as ccxt
from dotenv import load_dotenv
import platform
class LiveDataManager:
def __init__(self, symbol, exchange_name='mexc', window_size=120):
load_dotenv() # Load environment variables
self.symbol = symbol
self.exchange_name = exchange_name
self.window_size = window_size
self.candles = deque(maxlen=window_size)
self.ticks = deque(maxlen=window_size * 60) # Assuming max 60 ticks per minute
self.last_candle_time = None
self.exchange = self._initialize_exchange()
self.lock = asyncio.Lock() # Lock to prevent race conditions
self.is_windows = platform.system() == 'Windows'
def _initialize_exchange(self):
exchange_class = getattr(ccxt, self.exchange_name)
mexc_api_key = os.environ.get('MEXC_API_KEY')
mexc_api_secret = os.environ.get('MEXC_API_SECRET')
if not mexc_api_key or not mexc_api_secret:
print("API keys not found in environment variables. Using default keys.")
mexc_api_key = "mx0vglGymMT4iLpHXD"
mexc_api_secret = "557300a85ae84cf6b927b86278905fd7"
return exchange_class({
'apiKey': mexc_api_key,
'secret': mexc_api_secret,
'enableRateLimit': True,
})
async def _fetch_initial_candles(self):
print(f"Fetching initial candles for {self.symbol}...")
now = int(time.time() * 1000)
since = now - self.window_size * 60 * 1000
retries = 3
for attempt in range(retries):
try:
candles = await self.exchange.fetch_ohlcv(self.symbol, '1m', since=since, limit=self.window_size)
for candle in candles:
self.candles.append(self._format_candle(candle))
if candles:
self.last_candle_time = candles[-1][0]
print(f"Fetched {len(candles)} initial candles.")
return # Exit the function if successful
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if self.is_windows and "aiodns needs a SelectorEventLoop" in str(e):
print("aiodns issue detected on Windows. This is a known problem with aiodns and ccxt on Windows.")
if attempt < retries - 1:
await asyncio.sleep(5) # Wait before retrying
print("Failed to fetch initial candles after multiple retries.")
def _format_candle(self, candle_data):
return {
'timestamp': candle_data[0],
'open': float(candle_data[1]),
'high': float(candle_data[2]),
'low': float(candle_data[3]),
'close': float(candle_data[4]),
'volume': float(candle_data[5])
}
def _format_tick(self, tick_data):
# Check if 's' (symbol) is present, otherwise return None
if 's' not in tick_data:
return None
return {
'timestamp': tick_data['E'],
'symbol': tick_data['s'],
'price': float(tick_data['p']),
'quantity': float(tick_data['q'])
}
async def _update_candle(self, tick):
async with self.lock:
if self.last_candle_time is None: # first time
self.last_candle_time = tick['timestamp'] - (tick['timestamp'] % (60 * 1000))
new_candle = {
'timestamp': self.last_candle_time,
'open': tick['price'],
'high': tick['price'],
'low': tick['price'],
'close': tick['price'],
'volume': tick['quantity']
}
self.candles.append(new_candle)
if tick['timestamp'] >= self.last_candle_time + 60 * 1000:
# Start a new candle
self.last_candle_time += 60 * 1000
new_candle = {
'timestamp': self.last_candle_time,
'open': tick['price'],
'high': tick['price'],
'low': tick['price'],
'close': tick['price'],
'volume': tick['quantity']
}
self.candles.append(new_candle)
else:
# Update the current candle
current_candle = self.candles[-1]
current_candle['high'] = max(current_candle['high'], tick['price'])
current_candle['low'] = min(current_candle['low'], tick['price'])
current_candle['close'] = tick['price']
current_candle['volume'] += tick['quantity']
self.candles[-1] = current_candle # Reassign to trigger deque update
async def fetch_and_process_ticks(self):
async with self.lock:
since = None if not self.ticks else self.ticks[-1]['timestamp']
retries = 3
for attempt in range(retries):
try:
# Use fetch_trades (or appropriate method for your exchange) for live ticks.
ticks = await self.exchange.fetch_trades(self.symbol, since=since)
for tick in ticks:
formatted_tick = self._format_tick(tick)
if formatted_tick: # Add the check here
self.ticks.append(formatted_tick)
await self._update_candle(formatted_tick)
break # Exit the retry loop if successful
except Exception as e:
print(f"Error fetching ticks (attempt {attempt + 1}): {e}")
if self.is_windows and "aiodns needs a SelectorEventLoop" in str(e):
print("aiodns issue detected on Windows. This is a known problem with aiodns and ccxt on Windows.")
if attempt < retries - 1:
await asyncio.sleep(5) # Wait before retrying
async def get_data(self):
async with self.lock:
candles_copy = list(self.candles).copy()
ticks_copy = list(self.ticks).copy()
return candles_copy, ticks_copy
async def close(self):
await self.exchange.close()
async def main():
symbol = 'BTC/USDT'
manager = LiveDataManager(symbol)
await manager._fetch_initial_candles()
async def print_data():
while True:
await manager.fetch_and_process_ticks() # Fetch new ticks continuously
candles, ticks = await manager.get_data()
if candles:
print("Last Candle:", candles[-1])
if ticks:
print("Last Tick:", ticks[-1])
await asyncio.sleep(1) # Print every second
try:
await print_data() # Run the printing task
except KeyboardInterrupt:
print("Closing connection...")
finally:
await manager.close()
if __name__ == '__main__':
asyncio.run(main())