161 lines
5.9 KiB
Python
161 lines
5.9 KiB
Python
# data/live_data.py
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import time
|
|
from collections import deque
|
|
|
|
import ccxt.async_support as ccxt
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
class LiveDataManager:
|
|
def __init__(self, symbol, exchange_name='mexc', window_size=120):
|
|
load_dotenv() # Load environment variables
|
|
self.symbol = symbol
|
|
self.exchange_name = exchange_name
|
|
self.window_size = window_size
|
|
self.candles = deque(maxlen=window_size)
|
|
self.ticks = deque(maxlen=window_size * 60) # Assuming max 60 ticks per minute
|
|
self.last_candle_time = None
|
|
self.exchange = self._initialize_exchange()
|
|
self.lock = asyncio.Lock() # Lock to prevent race conditions
|
|
|
|
def _initialize_exchange(self):
|
|
exchange_class = getattr(ccxt, self.exchange_name)
|
|
mexc_api_key = os.environ.get('MEXC_API_KEY')
|
|
mexc_api_secret = os.environ.get('MEXC_API_SECRET')
|
|
|
|
if not mexc_api_key or not mexc_api_secret:
|
|
print("API keys not found in environment variables. Using default keys.")
|
|
mexc_api_key = "mx0vglGymMT4iLpHXD"
|
|
mexc_api_secret = "557300a85ae84cf6b927b86278905fd7"
|
|
|
|
return exchange_class({
|
|
'apiKey': mexc_api_key,
|
|
'secret': mexc_api_secret,
|
|
'enableRateLimit': True,
|
|
})
|
|
|
|
async def _fetch_initial_candles(self):
|
|
print(f"Fetching initial candles for {self.symbol}...")
|
|
now = int(time.time() * 1000)
|
|
since = now - self.window_size * 60 * 1000
|
|
try:
|
|
candles = await self.exchange.fetch_ohlcv(self.symbol, '1m', since=since, limit=self.window_size)
|
|
for candle in candles:
|
|
self.candles.append(self._format_candle(candle))
|
|
if candles:
|
|
self.last_candle_time = candles[-1][0]
|
|
print(f"Fetched {len(candles)} initial candles.")
|
|
except Exception as e:
|
|
print(f"Error fetching initial candles: {e}")
|
|
|
|
def _format_candle(self, candle_data):
|
|
return {
|
|
'timestamp': candle_data[0],
|
|
'open': float(candle_data[1]),
|
|
'high': float(candle_data[2]),
|
|
'low': float(candle_data[3]),
|
|
'close': float(candle_data[4]),
|
|
'volume': float(candle_data[5])
|
|
}
|
|
|
|
def _format_tick(self, tick_data):
|
|
# Check if 's' (symbol) is present, otherwise return None
|
|
if 's' not in tick_data:
|
|
return None
|
|
|
|
return {
|
|
'timestamp': tick_data['E'],
|
|
'symbol': tick_data['s'],
|
|
'price': float(tick_data['p']),
|
|
'quantity': float(tick_data['q'])
|
|
}
|
|
|
|
async def _update_candle(self, tick):
|
|
async with self.lock:
|
|
if self.last_candle_time is None: # first time
|
|
self.last_candle_time = tick['timestamp'] - (tick['timestamp'] % (60 * 1000))
|
|
new_candle = {
|
|
'timestamp': self.last_candle_time,
|
|
'open': tick['price'],
|
|
'high': tick['price'],
|
|
'low': tick['price'],
|
|
'close': tick['price'],
|
|
'volume': tick['quantity']
|
|
}
|
|
self.candles.append(new_candle)
|
|
|
|
if tick['timestamp'] >= self.last_candle_time + 60 * 1000:
|
|
# Start a new candle
|
|
self.last_candle_time += 60 * 1000
|
|
new_candle = {
|
|
'timestamp': self.last_candle_time,
|
|
'open': tick['price'],
|
|
'high': tick['price'],
|
|
'low': tick['price'],
|
|
'close': tick['price'],
|
|
'volume': tick['quantity']
|
|
}
|
|
self.candles.append(new_candle)
|
|
|
|
else:
|
|
# Update the current candle
|
|
current_candle = self.candles[-1]
|
|
current_candle['high'] = max(current_candle['high'], tick['price'])
|
|
current_candle['low'] = min(current_candle['low'], tick['price'])
|
|
current_candle['close'] = tick['price']
|
|
current_candle['volume'] += tick['quantity']
|
|
self.candles[-1] = current_candle # Reassign to trigger deque update
|
|
|
|
async def fetch_and_process_ticks(self):
|
|
async with self.lock:
|
|
since = None if not self.ticks else self.ticks[-1]['timestamp']
|
|
try:
|
|
# Use fetch_trades (or appropriate method for your exchange) for live ticks.
|
|
ticks = await self.exchange.fetch_trades(self.symbol, since=since)
|
|
for tick in ticks:
|
|
formatted_tick = self._format_tick(tick)
|
|
if formatted_tick: # Add the check here
|
|
self.ticks.append(formatted_tick)
|
|
await self._update_candle(formatted_tick)
|
|
except Exception as e:
|
|
print(f"Error fetching ticks: {e}")
|
|
|
|
async def get_data(self):
|
|
async with self.lock:
|
|
candles_copy = list(self.candles).copy()
|
|
ticks_copy = list(self.ticks).copy()
|
|
return candles_copy, ticks_copy
|
|
|
|
async def close(self):
|
|
await self.exchange.close()
|
|
|
|
|
|
|
|
async def main():
|
|
symbol = 'BTC/USDT'
|
|
manager = LiveDataManager(symbol)
|
|
await manager._fetch_initial_candles()
|
|
|
|
async def print_data():
|
|
while True:
|
|
await manager.fetch_and_process_ticks() # Fetch new ticks continuously
|
|
candles, ticks = await manager.get_data()
|
|
if candles:
|
|
print("Last Candle:", candles[-1])
|
|
if ticks:
|
|
print("Last Tick:", ticks[-1])
|
|
await asyncio.sleep(1) # Print every second
|
|
|
|
try:
|
|
await print_data() # Run the printing task
|
|
except KeyboardInterrupt:
|
|
print("Closing connection...")
|
|
finally:
|
|
await manager.close()
|
|
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|