wip
This commit is contained in:
@ -7,6 +7,7 @@ This module consolidates all data functionality including:
|
||||
- Multi-timeframe candle generation
|
||||
- Caching and data management
|
||||
- Technical indicators calculation
|
||||
- Centralized data distribution to multiple subscribers (AI models, dashboard, etc.)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@ -14,23 +15,51 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
import websockets
|
||||
import requests
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
from typing import Dict, List, Optional, Tuple, Any, Callable
|
||||
from dataclasses import dataclass, field
|
||||
import ta
|
||||
from threading import Thread, Lock
|
||||
from collections import deque
|
||||
from dataclasses import dataclass, field
|
||||
import uuid
|
||||
|
||||
from .config import get_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class MarketTick:
|
||||
"""Standardized market tick data structure"""
|
||||
symbol: str
|
||||
timestamp: datetime
|
||||
price: float
|
||||
volume: float
|
||||
quantity: float
|
||||
side: str # 'buy' or 'sell'
|
||||
trade_id: str
|
||||
is_buyer_maker: bool
|
||||
raw_data: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@dataclass
|
||||
class DataSubscriber:
|
||||
"""Data subscriber information"""
|
||||
subscriber_id: str
|
||||
callback: Callable[[MarketTick], None]
|
||||
symbols: List[str]
|
||||
active: bool = True
|
||||
last_update: datetime = field(default_factory=datetime.now)
|
||||
tick_count: int = 0
|
||||
subscriber_name: str = "unknown"
|
||||
|
||||
class DataProvider:
|
||||
"""Unified data provider for historical and real-time market data"""
|
||||
"""Unified data provider for historical and real-time market data with centralized distribution"""
|
||||
|
||||
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None):
|
||||
"""Initialize the data provider"""
|
||||
@ -48,6 +77,30 @@ class DataProvider:
|
||||
self.is_streaming = False
|
||||
self.data_lock = Lock()
|
||||
|
||||
# Subscriber management for centralized data distribution
|
||||
self.subscribers: Dict[str, DataSubscriber] = {}
|
||||
self.subscriber_lock = Lock()
|
||||
self.tick_buffers: Dict[str, deque] = {}
|
||||
self.buffer_size = 1000 # Keep last 1000 ticks per symbol
|
||||
|
||||
# Initialize tick buffers
|
||||
for symbol in self.symbols:
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size)
|
||||
|
||||
# Performance tracking for subscribers
|
||||
self.distribution_stats = {
|
||||
'total_ticks_received': 0,
|
||||
'total_ticks_distributed': 0,
|
||||
'distribution_errors': 0,
|
||||
'last_tick_time': {},
|
||||
'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols}
|
||||
}
|
||||
|
||||
# Data validation
|
||||
self.last_prices = {symbol.replace('/', '').upper(): 0.0 for symbol in self.symbols}
|
||||
self.price_change_threshold = 0.1 # 10% price change threshold for validation
|
||||
|
||||
# Cache settings
|
||||
self.cache_enabled = self.config.data.get('cache_enabled', True)
|
||||
self.cache_dir = Path(self.config.data.get('cache_dir', 'cache'))
|
||||
@ -55,12 +108,13 @@ class DataProvider:
|
||||
|
||||
# Timeframe conversion
|
||||
self.timeframe_seconds = {
|
||||
'1m': 60, '5m': 300, '15m': 900, '30m': 1800,
|
||||
'1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800,
|
||||
'1h': 3600, '4h': 14400, '1d': 86400
|
||||
}
|
||||
|
||||
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
|
||||
logger.info(f"Timeframes: {self.timeframes}")
|
||||
logger.info("Centralized data distribution enabled")
|
||||
|
||||
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]:
|
||||
"""Get historical OHLCV data for a symbol and timeframe"""
|
||||
@ -372,12 +426,14 @@ class DataProvider:
|
||||
self.websocket_tasks.clear()
|
||||
|
||||
async def _websocket_stream(self, symbol: str):
|
||||
"""WebSocket stream for a single symbol"""
|
||||
binance_symbol = symbol.replace('/', '').lower()
|
||||
url = f"wss://stream.binance.com:9443/ws/{binance_symbol}@ticker"
|
||||
"""WebSocket stream for a single symbol using trade stream for better granularity"""
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@trade"
|
||||
|
||||
while self.is_streaming:
|
||||
try:
|
||||
logger.info(f"Connecting to WebSocket for {symbol}: {url}")
|
||||
|
||||
async with websockets.connect(url) as websocket:
|
||||
logger.info(f"WebSocket connected for {symbol}")
|
||||
|
||||
@ -386,27 +442,75 @@ class DataProvider:
|
||||
break
|
||||
|
||||
try:
|
||||
data = json.loads(message)
|
||||
await self._process_tick(symbol, data)
|
||||
await self._process_trade_message(binance_symbol, message)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error processing tick for {symbol}: {e}")
|
||||
logger.warning(f"Error processing trade message for {symbol}: {e}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket error for {symbol}: {e}")
|
||||
self.distribution_stats['distribution_errors'] += 1
|
||||
|
||||
if self.is_streaming:
|
||||
logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def _process_tick(self, symbol: str, tick_data: Dict):
|
||||
async def _process_trade_message(self, symbol: str, message: str):
|
||||
"""Process incoming trade message and distribute to subscribers"""
|
||||
try:
|
||||
trade_data = json.loads(message)
|
||||
|
||||
# Extract trade information
|
||||
price = float(trade_data.get('p', 0))
|
||||
quantity = float(trade_data.get('q', 0))
|
||||
timestamp = datetime.fromtimestamp(int(trade_data.get('T', 0)) / 1000)
|
||||
is_buyer_maker = trade_data.get('m', False)
|
||||
trade_id = trade_data.get('t', '')
|
||||
|
||||
# Calculate volume in USDT
|
||||
volume_usdt = price * quantity
|
||||
|
||||
# Data validation
|
||||
if not self._validate_tick_data(symbol, price, volume_usdt):
|
||||
logger.warning(f"Invalid tick data for {symbol}: price={price}, volume={volume_usdt}")
|
||||
return
|
||||
|
||||
# Create standardized tick
|
||||
tick = MarketTick(
|
||||
symbol=symbol,
|
||||
timestamp=timestamp,
|
||||
price=price,
|
||||
volume=volume_usdt,
|
||||
quantity=quantity,
|
||||
side='sell' if is_buyer_maker else 'buy',
|
||||
trade_id=str(trade_id),
|
||||
is_buyer_maker=is_buyer_maker,
|
||||
raw_data=trade_data
|
||||
)
|
||||
|
||||
# Add to buffer
|
||||
self.tick_buffers[symbol].append(tick)
|
||||
|
||||
# Update statistics
|
||||
self.distribution_stats['total_ticks_received'] += 1
|
||||
self.distribution_stats['ticks_per_symbol'][symbol] += 1
|
||||
self.distribution_stats['last_tick_time'][symbol] = timestamp
|
||||
self.last_prices[symbol] = price
|
||||
|
||||
# Update current prices and candles
|
||||
await self._process_tick(symbol, tick)
|
||||
|
||||
# Distribute to all subscribers
|
||||
self._distribute_tick(tick)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing trade message for {symbol}: {e}")
|
||||
|
||||
async def _process_tick(self, symbol: str, tick: MarketTick):
|
||||
"""Process a single tick and update candles"""
|
||||
try:
|
||||
price = float(tick_data.get('c', 0)) # Current price
|
||||
volume = float(tick_data.get('v', 0)) # 24h Volume
|
||||
timestamp = pd.Timestamp.now()
|
||||
|
||||
# Update current price
|
||||
with self.data_lock:
|
||||
self.current_prices[symbol] = price
|
||||
self.current_prices[symbol] = tick.price
|
||||
|
||||
# Initialize real-time data structure if needed
|
||||
if symbol not in self.real_time_data:
|
||||
@ -414,16 +518,16 @@ class DataProvider:
|
||||
for tf in self.timeframes:
|
||||
self.real_time_data[symbol][tf] = deque(maxlen=1000)
|
||||
|
||||
# Create tick record
|
||||
tick = {
|
||||
'timestamp': timestamp,
|
||||
'price': price,
|
||||
'volume': volume
|
||||
# Create tick record for candle updates
|
||||
tick_record = {
|
||||
'timestamp': tick.timestamp,
|
||||
'price': tick.price,
|
||||
'volume': tick.volume
|
||||
}
|
||||
|
||||
# Update all timeframes
|
||||
for timeframe in self.timeframes:
|
||||
self._update_candle(symbol, timeframe, tick)
|
||||
self._update_candle(symbol, timeframe, tick_record)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing tick for {symbol}: {e}")
|
||||
@ -806,4 +910,148 @@ class DataProvider:
|
||||
not self.historical_data[symbol][tf].empty)
|
||||
status['historical_data_loaded'][symbol][tf] = has_data
|
||||
|
||||
return status
|
||||
return status
|
||||
|
||||
def subscribe_to_ticks(self, callback: Callable[[MarketTick], None],
|
||||
symbols: List[str] = None,
|
||||
subscriber_name: str = None) -> str:
|
||||
"""Subscribe to real-time tick data updates"""
|
||||
subscriber_id = str(uuid.uuid4())[:8]
|
||||
subscriber_name = subscriber_name or f"subscriber_{subscriber_id}"
|
||||
|
||||
# Convert symbols to Binance format
|
||||
if symbols:
|
||||
binance_symbols = [s.replace('/', '').upper() for s in symbols]
|
||||
else:
|
||||
binance_symbols = [s.replace('/', '').upper() for s in self.symbols]
|
||||
|
||||
subscriber = DataSubscriber(
|
||||
subscriber_id=subscriber_id,
|
||||
callback=callback,
|
||||
symbols=binance_symbols,
|
||||
subscriber_name=subscriber_name
|
||||
)
|
||||
|
||||
with self.subscriber_lock:
|
||||
self.subscribers[subscriber_id] = subscriber
|
||||
|
||||
logger.info(f"New tick subscriber registered: {subscriber_name} ({subscriber_id}) for symbols: {binance_symbols}")
|
||||
|
||||
# Send recent tick data to new subscriber
|
||||
self._send_recent_ticks_to_subscriber(subscriber)
|
||||
|
||||
return subscriber_id
|
||||
|
||||
def unsubscribe_from_ticks(self, subscriber_id: str):
|
||||
"""Unsubscribe from tick data updates"""
|
||||
with self.subscriber_lock:
|
||||
if subscriber_id in self.subscribers:
|
||||
subscriber_name = self.subscribers[subscriber_id].subscriber_name
|
||||
self.subscribers[subscriber_id].active = False
|
||||
del self.subscribers[subscriber_id]
|
||||
logger.info(f"Subscriber {subscriber_name} ({subscriber_id}) unsubscribed")
|
||||
|
||||
def _send_recent_ticks_to_subscriber(self, subscriber: DataSubscriber):
|
||||
"""Send recent tick data to a new subscriber"""
|
||||
try:
|
||||
for symbol in subscriber.symbols:
|
||||
if symbol in self.tick_buffers:
|
||||
# Send last 50 ticks to get subscriber up to speed
|
||||
recent_ticks = list(self.tick_buffers[symbol])[-50:]
|
||||
for tick in recent_ticks:
|
||||
try:
|
||||
subscriber.callback(tick)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error sending recent tick to subscriber {subscriber.subscriber_id}: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending recent ticks: {e}")
|
||||
|
||||
def _distribute_tick(self, tick: MarketTick):
|
||||
"""Distribute tick to all relevant subscribers"""
|
||||
distributed_count = 0
|
||||
|
||||
with self.subscriber_lock:
|
||||
subscribers_to_remove = []
|
||||
|
||||
for subscriber_id, subscriber in self.subscribers.items():
|
||||
if not subscriber.active:
|
||||
subscribers_to_remove.append(subscriber_id)
|
||||
continue
|
||||
|
||||
if tick.symbol in subscriber.symbols:
|
||||
try:
|
||||
# Call subscriber callback in a thread to avoid blocking
|
||||
def call_callback():
|
||||
try:
|
||||
subscriber.callback(tick)
|
||||
subscriber.tick_count += 1
|
||||
subscriber.last_update = datetime.now()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error in subscriber {subscriber_id} callback: {e}")
|
||||
subscriber.active = False
|
||||
|
||||
# Use thread to avoid blocking the main data processing
|
||||
Thread(target=call_callback, daemon=True).start()
|
||||
distributed_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error distributing tick to subscriber {subscriber_id}: {e}")
|
||||
subscriber.active = False
|
||||
|
||||
# Remove inactive subscribers
|
||||
for subscriber_id in subscribers_to_remove:
|
||||
if subscriber_id in self.subscribers:
|
||||
del self.subscribers[subscriber_id]
|
||||
|
||||
self.distribution_stats['total_ticks_distributed'] += distributed_count
|
||||
|
||||
def _validate_tick_data(self, symbol: str, price: float, volume: float) -> bool:
|
||||
"""Validate incoming tick data for quality"""
|
||||
try:
|
||||
# Basic validation
|
||||
if price <= 0 or volume < 0:
|
||||
return False
|
||||
|
||||
# Price change validation
|
||||
last_price = self.last_prices.get(symbol, 0)
|
||||
if last_price > 0:
|
||||
price_change_pct = abs(price - last_price) / last_price
|
||||
if price_change_pct > self.price_change_threshold:
|
||||
logger.warning(f"Large price change for {symbol}: {price_change_pct:.2%}")
|
||||
# Don't reject, just warn - could be legitimate
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating tick data: {e}")
|
||||
return False
|
||||
|
||||
def get_recent_ticks(self, symbol: str, count: int = 100) -> List[MarketTick]:
|
||||
"""Get recent ticks for a symbol"""
|
||||
binance_symbol = symbol.replace('/', '').upper()
|
||||
if binance_symbol in self.tick_buffers:
|
||||
return list(self.tick_buffers[binance_symbol])[-count:]
|
||||
return []
|
||||
|
||||
def get_subscriber_stats(self) -> Dict[str, Any]:
|
||||
"""Get subscriber and distribution statistics"""
|
||||
with self.subscriber_lock:
|
||||
active_subscribers = len([s for s in self.subscribers.values() if s.active])
|
||||
subscriber_stats = {
|
||||
sid: {
|
||||
'name': s.subscriber_name,
|
||||
'active': s.active,
|
||||
'symbols': s.symbols,
|
||||
'tick_count': s.tick_count,
|
||||
'last_update': s.last_update.isoformat() if s.last_update else None
|
||||
}
|
||||
for sid, s in self.subscribers.items()
|
||||
}
|
||||
|
||||
return {
|
||||
'active_subscribers': active_subscribers,
|
||||
'total_subscribers': len(self.subscribers),
|
||||
'subscriber_details': subscriber_stats,
|
||||
'distribution_stats': self.distribution_stats.copy(),
|
||||
'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()}
|
||||
}
|
Reference in New Issue
Block a user