Files
gogo2/core/data_provider.py
Dobromir Popov 0c28a0997c more cleanup
2025-10-13 16:11:06 +03:00

3519 lines
157 KiB
Python

"""
Multi-Timeframe, Multi-Symbol Data Provider
CRITICAL POLICY: NO SYNTHETIC DATA ALLOWED
This module MUST ONLY use real market data from exchanges.
NEVER use np.random.*, mock/fake/synthetic data, or placeholder values.
If data is unavailable: return None/0/empty, log errors, raise exceptions.
See: reports/REAL_MARKET_DATA_POLICY.md
This module consolidates all data functionality including:
- Historical data fetching from Binance API
- Real-time data streaming via WebSocket
- Multi-timeframe candle generation
- Caching and data management
- Technical indicators calculation
- Williams Market Structure pivot points with monthly data analysis
- Pivot-based feature normalization for improved model training
- Centralized data distribution to multiple subscribers (AI models, dashboard, etc.)
"""
import asyncio
import json
import logging
import os
import time
import uuid
import websockets
import requests
import pandas as pd
import numpy as np
import pickle
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
import ta
import warnings
from threading import Thread, Lock
from collections import deque
import math
# Suppress ta library deprecation warnings
warnings.filterwarnings("ignore", category=FutureWarning, module="ta")
# Import timezone utilities
from utils.timezone_utils import (
normalize_timestamp, normalize_dataframe_timestamps, normalize_dataframe_index,
now_system, now_utc, to_sofia, UTC, SOFIA_TZ, log_timezone_info
)
from .config import get_config
from .tick_aggregator import RealTimeTickAggregator, RawTick, OHLCVBar
from .cnn_monitor import log_cnn_prediction
from .williams_market_structure import WilliamsMarketStructure, PivotPoint, TrendLevel
from .enhanced_cob_websocket import EnhancedCOBWebSocket, get_enhanced_cob_websocket
from .huobi_cob_websocket import get_huobi_cob_websocket
from .cob_integration import COBIntegration
logger = logging.getLogger(__name__)
@dataclass
class PivotBounds:
"""Pivot-based normalization bounds derived from Williams Market Structure"""
symbol: str
price_max: float
price_min: float
volume_max: float
volume_min: float
pivot_support_levels: List[float]
pivot_resistance_levels: List[float]
pivot_context: Dict[str, Any]
created_timestamp: datetime
data_period_start: datetime
data_period_end: datetime
total_candles_analyzed: int
def get_price_range(self) -> float:
"""Get price range for normalization"""
return self.price_max - self.price_min
def normalize_price(self, price: float) -> float:
"""Normalize price using pivot bounds"""
return (price - self.price_min) / self.get_price_range()
def get_nearest_support_distance(self, current_price: float) -> float:
"""Get distance to nearest support level (normalized)"""
if not self.pivot_support_levels:
return 0.5
distances = [abs(current_price - s) for s in self.pivot_support_levels]
return min(distances) / self.get_price_range()
def get_nearest_resistance_distance(self, current_price: float) -> float:
"""Get distance to nearest resistance level (normalized)"""
if not self.pivot_resistance_levels:
return 0.5
distances = [abs(current_price - r) for r in self.pivot_resistance_levels]
return min(distances) / self.get_price_range()
@dataclass
class SimplePivotLevel:
"""Simple pivot level structure for fallback pivot detection"""
swing_points: List[Any] = field(default_factory=list)
support_levels: List[float] = field(default_factory=list)
resistance_levels: List[float] = field(default_factory=list)
@dataclass
class MarketTick:
"""Standardized market tick data structure"""
symbol: str
timestamp: datetime
price: float
volume: float
quantity: float
side: str # 'buy' or 'sell'
trade_id: str
is_buyer_maker: bool
raw_data: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DataSubscriber:
"""Data subscriber information"""
subscriber_id: str
callback: Callable[[MarketTick], None]
symbols: List[str]
active: bool = True
last_update: datetime = field(default_factory=datetime.now)
tick_count: int = 0
subscriber_name: str = "unknown"
class DataProvider:
"""Unified data provider for historical and real-time market data with centralized distribution"""
def __init__(self, symbols: List[str] = None, timeframes: List[str] = None):
"""Initialize the data provider"""
self.config = get_config()
# Fixed symbols and timeframes for caching
self.symbols = ['ETH/USDT', 'BTC/USDT']
self.timeframes = ['1s', '1m', '1h', '1d']
# Cache settings (initialize first)
self.cache_enabled = True
self.cache_dir = Path('cache')
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Data storage - cached OHLCV data (1500 candles each)
self.cached_data = {} # {symbol: {timeframe: DataFrame}}
self.real_time_data = {} # {symbol: {timeframe: deque}}
self.current_prices = {} # {symbol: float}
# Live price cache for low-latency price updates
self.live_price_cache: Dict[str, Tuple[float, datetime]] = {}
self.live_price_cache_ttl = timedelta(milliseconds=500)
# Initialize cached data structure
for symbol in self.symbols:
self.cached_data[symbol] = {}
for timeframe in self.timeframes:
self.cached_data[symbol][timeframe] = pd.DataFrame()
# Pivot-based normalization system
self.pivot_bounds: Dict[str, PivotBounds] = {} # {symbol: PivotBounds}
self.pivot_cache_dir = self.cache_dir / 'pivot_bounds'
self.pivot_cache_dir.mkdir(parents=True, exist_ok=True)
self.pivot_refresh_interval = timedelta(days=1) # Refresh pivot bounds daily
self.monthly_data_cache_dir = self.cache_dir / 'monthly_1s_data'
self.monthly_data_cache_dir.mkdir(parents=True, exist_ok=True)
# Enhanced WebSocket integration
self.enhanced_cob_websocket: Optional[EnhancedCOBWebSocket] = None
self.websocket_tasks = {}
# COB collection state guard to prevent duplicate starts
self._cob_started: bool = False
# Ensure COB collection is started so BaseDataInput includes real order book data
try:
self.start_cob_collection()
except Exception as _cob_init_ex:
logger.error(f"Failed to start COB collection at init: {_cob_init_ex}")
self.is_streaming = False
self.data_lock = Lock()
# COB data from enhanced WebSocket
self.cob_websocket_data: Dict[str, Dict] = {} # Latest COB data from WebSocket
self.cob_websocket_status: Dict[str, str] = {} # WebSocket status per symbol
# Subscriber management for centralized data distribution
self.subscribers: Dict[str, DataSubscriber] = {}
self.subscriber_lock = Lock()
self.tick_buffers: Dict[str, deque] = {}
self.buffer_size = 1000 # Keep last 1000 ticks per symbol
# Initialize tick buffers
for symbol in self.symbols:
binance_symbol = symbol.replace('/', '').upper()
self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size)
# BOM (Book of Market) data caching - 1s resolution for last 5 minutes
self.bom_cache_duration = 300 # 5 minutes in seconds
self.bom_feature_count = 120 # Number of BOM features per timestamp
self.bom_data_cache: Dict[str, deque] = {} # {symbol: deque of (timestamp, bom_features)}
# Initialize BOM cache for each symbol
for symbol in self.symbols:
# Store 300 seconds worth of 1s BOM data
self.bom_data_cache[symbol] = deque(maxlen=self.bom_cache_duration)
# Initialize tick aggregator for raw tick processing
binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols]
self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols)
# Raw tick and OHLCV bar callbacks
self.raw_tick_callbacks = []
self.ohlcv_bar_callbacks = []
# Performance tracking for subscribers
self.distribution_stats = {
'total_ticks_received': 0,
'total_ticks_distributed': 0,
'distribution_errors': 0,
'last_tick_time': {},
'ticks_per_symbol': {symbol.replace('/', '').upper(): 0 for symbol in self.symbols},
'raw_ticks_processed': 0,
'ohlcv_bars_created': 0,
'patterns_detected': 0
}
# Data validation
self.last_prices = {symbol.replace('/', '').upper(): 0.0 for symbol in self.symbols}
self.price_change_threshold = 0.1 # 10% price change threshold for validation
# Timeframe conversion
self.timeframe_seconds = {
'1s': 1, '1m': 60, '5m': 300, '15m': 900, '30m': 1800,
'1h': 3600, '4h': 14400, '1d': 86400
}
# Williams Market Structure integration
self.williams_structure: Dict[str, WilliamsMarketStructure] = {}
for symbol in self.symbols:
self.williams_structure[symbol] = WilliamsMarketStructure(min_pivot_distance=3)
# Pivot point caching
self.pivot_points_cache: Dict[str, Dict[int, TrendLevel]] = {} # {symbol: {level: TrendLevel}}
self.last_pivot_calculation: Dict[str, datetime] = {}
self.pivot_calculation_interval = timedelta(minutes=5) # Recalculate every 5 minutes
# Auto-fix corrupted cache files on startup
self._auto_fix_corrupted_cache()
# Load existing pivot bounds from cache
self._load_all_pivot_bounds()
# COB (Consolidated Order Book) data system using WebSocket
self.cob_integration: Optional[COBIntegration] = None
# COB data storage - 30 minutes of raw ticks and 1s aggregated data
self.cob_raw_ticks: Dict[str, deque] = {} # Raw COB ticks (30 min)
self.cob_1s_aggregated: Dict[str, deque] = {} # 1s aggregated COB data with $1 buckets (30 min)
# Initialize COB data structures
for symbol in self.symbols:
# Raw ticks: ~100 ticks/second * 30 minutes = ~180,000 ticks
self.cob_raw_ticks[symbol] = deque(maxlen=180000)
# 1s aggregated: 30 minutes = 1,800 seconds
self.cob_1s_aggregated[symbol] = deque(maxlen=1800)
# COB callbacks for data distribution
self.cob_data_callbacks: List[Callable] = []
self.cob_aggregated_callbacks: List[Callable] = []
# Training data collection (simplified)
self.training_data_cache: Dict[str, deque] = {}
self.training_data_callbacks: List[Callable] = []
self.model_prediction_callbacks: List[Callable] = []
# Initialize training data cache
for symbol in self.symbols:
binance_symbol = symbol.replace('/', '').upper()
self.training_data_cache[binance_symbol] = deque(maxlen=1000)
# Data collection threads
self.data_collection_active = False
# COB data collection
self.cob_collection_active = False
self.cob_collection_thread = None
# Training data collection
self.training_data_collection_active = False
self.training_data_thread = None
# Price-level bucketing
self.bucketed_cob_data: Dict[str, Dict] = {}
self.bucket_sizes = [1, 10] # $1 and $10 buckets
self.bucketed_cob_callbacks: Dict[int, List[Callable]] = {size: [] for size in self.bucket_sizes}
# Automatic data maintenance
self.data_maintenance_active = False
self.data_maintenance_thread = None
# Timeframe intervals in seconds for automatic updates
self.timeframe_intervals = {
'1s': 1,
'1m': 60,
'1h': 3600,
'1d': 86400
}
logger.info(f"DataProvider initialized for symbols: {self.symbols}")
logger.info(f"Timeframes: {self.timeframes}")
logger.info("Automatic data maintenance enabled")
logger.info("Centralized data distribution enabled")
logger.info("Pivot-based normalization system enabled")
logger.info("Williams Market Structure integration enabled")
logger.info("COB and training data collection enabled")
# Rate limiting
self.last_request_time = {}
self.request_interval = 0.5 # 500ms between requests to avoid rate limits
self.retry_delay = 60 # 1 minute retry delay for 451 errors
self.max_retries = 3
# Start automatic data maintenance
self.start_automatic_data_maintenance()
# Start COB WebSocket integration
self.start_cob_websocket_integration()
def start_automatic_data_maintenance(self):
"""Start automatic data maintenance system"""
if self.data_maintenance_active:
logger.warning("Data maintenance already active")
return
self.data_maintenance_active = True
self.data_maintenance_thread = Thread(target=self._data_maintenance_worker, daemon=True)
self.data_maintenance_thread.start()
logger.info("Automatic data maintenance started")
def stop_automatic_data_maintenance(self):
"""Stop automatic data maintenance system"""
self.data_maintenance_active = False
if self.data_maintenance_thread and self.data_maintenance_thread.is_alive():
self.data_maintenance_thread.join(timeout=5)
logger.info("Automatic data maintenance stopped")
def _data_maintenance_worker(self):
"""Worker thread for automatic data maintenance"""
logger.info("Data maintenance worker started")
# Initial data load
self._initial_data_load()
# Track last update times for each symbol/timeframe
last_updates = {}
for symbol in self.symbols:
last_updates[symbol] = {}
for timeframe in self.timeframes:
last_updates[symbol][timeframe] = 0
while self.data_maintenance_active:
try:
current_time = time.time()
# Check each symbol/timeframe for updates
for symbol in self.symbols:
for timeframe in self.timeframes:
interval = self.timeframe_intervals[timeframe]
half_interval = interval / 2
# Update every half candle period
if current_time - last_updates[symbol][timeframe] >= half_interval:
self._update_cached_data(symbol, timeframe)
last_updates[symbol][timeframe] = current_time
# Sleep for 1 second before next check
time.sleep(1)
except Exception as e:
logger.error(f"Error in data maintenance worker: {e}")
time.sleep(10) # Wait longer on error
def _initial_data_load(self):
"""Load initial 1500 candles for each symbol/timeframe"""
logger.info("Starting initial data load (1500 candles each)")
for symbol in self.symbols:
for timeframe in self.timeframes:
try:
logger.info(f"Loading initial data for {symbol} {timeframe}")
df = self._fetch_from_binance(symbol, timeframe, 1500)
if df is None or df.empty:
logger.warning(f"Binance failed for {symbol} {timeframe}, trying MEXC")
df = self._fetch_from_mexc(symbol, timeframe, 1500)
if df is not None and not df.empty:
# Ensure proper datetime index
df = self._ensure_datetime_index(df)
# Store in cached data
self.cached_data[symbol][timeframe] = df
logger.info(f"Loaded {len(df)} candles for {symbol} {timeframe}")
else:
logger.error(f"Failed to load initial data for {symbol} {timeframe}")
# Rate limiting between requests
time.sleep(0.5)
except Exception as e:
logger.error(f"Error loading initial data for {symbol} {timeframe}: {e}")
logger.info("Initial data load completed")
def _update_cached_data(self, symbol: str, timeframe: str):
"""Update cached data by fetching last 2 candles"""
try:
# Fetch last 2 candles
df = self._fetch_from_binance(symbol, timeframe, 2)
if df is None or df.empty:
df = self._fetch_from_mexc(symbol, timeframe, 2)
if df is not None and not df.empty:
# Ensure proper datetime index
df = self._ensure_datetime_index(df)
# Get existing cached data
existing_df = self.cached_data[symbol][timeframe]
if not existing_df.empty:
# Merge new data with existing, avoiding duplicates
combined_df = pd.concat([existing_df, df], ignore_index=False)
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
combined_df = combined_df.sort_index()
# Keep only last 1500 candles
self.cached_data[symbol][timeframe] = combined_df.tail(1500)
else:
self.cached_data[symbol][timeframe] = df
logger.debug(f"Updated cached data for {symbol} {timeframe}: {len(self.cached_data[symbol][timeframe])} candles")
except Exception as e:
logger.debug(f"Error updating cached data for {symbol} {timeframe}: {e}")
def start_cob_websocket_integration(self):
"""Start COB WebSocket integration using COBIntegration class"""
try:
logger.info("Starting COB WebSocket integration")
# Initialize COB integration
self.cob_integration = COBIntegration(data_provider=self, symbols=self.symbols)
# Add callback for COB data
self.cob_integration.add_dashboard_callback(self._on_cob_websocket_update)
# Start COB integration in background thread
cob_thread = Thread(target=self._run_cob_integration, daemon=True)
cob_thread.start()
# Start 1s aggregation worker
aggregation_thread = Thread(target=self._cob_aggregation_worker, daemon=True)
aggregation_thread.start()
logger.info("COB WebSocket integration started")
except Exception as e:
logger.error(f"Error starting COB WebSocket integration: {e}")
def _run_cob_integration(self):
"""Run COB integration in asyncio event loop"""
try:
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Run COB integration
loop.run_until_complete(self.cob_integration.start())
except Exception as e:
logger.error(f"Error running COB integration: {e}")
def _on_cob_websocket_update(self, symbol: str, cob_data: Dict):
"""Handle COB updates from WebSocket"""
try:
# Extract the actual COB data from the wrapper
if 'data' in cob_data:
actual_data = cob_data['data']
else:
actual_data = cob_data
# Create raw tick entry
raw_tick = {
'symbol': symbol,
'timestamp': now_system(), # Use system timezone consistently
'bids': actual_data.get('bids', [])[:50], # Top 50 levels
'asks': actual_data.get('asks', [])[:50], # Top 50 levels
'stats': actual_data.get('stats', {}),
'source': 'websocket'
}
# Store raw tick
self.cob_raw_ticks[symbol].append(raw_tick)
# Distribute to raw COB callbacks
for callback in self.cob_data_callbacks:
try:
callback(symbol, raw_tick)
except Exception as e:
logger.error(f"Error in COB callback: {e}")
logger.debug(f"Processed COB WebSocket update for {symbol}")
except Exception as e:
logger.error(f"Error processing COB WebSocket update for {symbol}: {e}")
def _cob_aggregation_worker(self):
"""Worker thread for 1s COB aggregation with $1 price buckets"""
logger.info("Starting COB 1s aggregation worker")
# Track last aggregation time for each symbol
last_aggregation = {symbol: 0 for symbol in self.symbols}
while True:
try:
current_time = time.time()
current_second = int(current_time)
# Process each symbol
for symbol in self.symbols:
# Aggregate every second
if current_second > last_aggregation[symbol]:
self._aggregate_cob_1s(symbol, current_second - 1)
last_aggregation[symbol] = current_second
# Sleep until next second boundary
sleep_time = 1.0 - (current_time % 1.0)
time.sleep(max(0.1, sleep_time))
except Exception as e:
logger.error(f"Error in COB aggregation worker: {e}")
time.sleep(1)
def _aggregate_cob_1s(self, symbol: str, target_second: int):
"""Aggregate COB data for 1 second with $1 price buckets and multi-timeframe imbalances"""
try:
# Get raw ticks for the target second
target_ticks = []
# FIXED: Create a copy of the deque to avoid mutation during iteration
if symbol in self.cob_raw_ticks:
# Create a safe copy of the deque to iterate over
ticks_copy = list(self.cob_raw_ticks[symbol])
for tick in ticks_copy:
tick_timestamp = tick['timestamp']
# Handle both datetime and float timestamps
if isinstance(tick_timestamp, datetime):
tick_time = tick_timestamp.timestamp()
else:
tick_time = float(tick_timestamp)
# Check if tick is in target second
if target_second <= tick_time < target_second + 1:
target_ticks.append(tick)
if not target_ticks:
return
# Aggregate the ticks with $1 price buckets
aggregated_data = self._create_1s_cob_aggregation(symbol, target_ticks, target_second)
# Add multi-timeframe imbalance calculations
aggregated_data = self._add_multi_timeframe_imbalances(symbol, aggregated_data, target_second)
# Store aggregated data
self.cob_1s_aggregated[symbol].append(aggregated_data)
# Distribute to aggregated COB callbacks
for callback in self.cob_aggregated_callbacks:
try:
callback(symbol, aggregated_data)
except Exception as e:
logger.error(f"Error in COB aggregated callback: {e}")
logger.debug(f"Aggregated {len(target_ticks)} COB ticks for {symbol} at second {target_second}")
except Exception as e:
logger.error(f"Error aggregating COB 1s for {symbol}: {e}")
def _add_multi_timeframe_imbalances(self, symbol: str, aggregated_data: Dict, current_second: int) -> Dict:
"""Add COB-based order book imbalances with configurable price ranges"""
try:
# Get price range based on symbol
price_range = self._get_price_range_for_symbol(symbol)
# Get latest COB data for current imbalance calculation
latest_cob = self.get_latest_cob_data(symbol)
current_imbalance = 0.0
if latest_cob:
current_imbalance = self._calculate_cob_imbalance(latest_cob, price_range)
# Get historical COB data for timeframe calculations
# FIXED: Create a safe copy to avoid deque mutation during iteration
historical_cob_data = []
if symbol in self.cob_raw_ticks:
try:
historical_cob_data = list(self.cob_raw_ticks[symbol])
except Exception as e:
logger.debug(f"Error copying COB raw ticks for {symbol}: {e}")
historical_cob_data = []
# Calculate imbalances for different timeframes using COB data
imbalances = {
'imbalance_1s': current_imbalance, # Current COB imbalance
'imbalance_5s': self._calculate_timeframe_cob_imbalance(historical_cob_data, 5, price_range),
'imbalance_15s': self._calculate_timeframe_cob_imbalance(historical_cob_data, 15, price_range),
'imbalance_60s': self._calculate_timeframe_cob_imbalance(historical_cob_data, 60, price_range)
}
# Add volume-weighted imbalances within price range
volume_imbalances = {
'volume_imbalance_1s': current_imbalance,
'volume_imbalance_5s': self._calculate_volume_weighted_imbalance(historical_cob_data, 5, price_range),
'volume_imbalance_15s': self._calculate_volume_weighted_imbalance(historical_cob_data, 15, price_range),
'volume_imbalance_60s': self._calculate_volume_weighted_imbalance(historical_cob_data, 60, price_range)
}
# Combine all imbalance metrics
all_imbalances = {**imbalances, **volume_imbalances}
# Add to aggregated data
aggregated_data.update(all_imbalances)
# Also add to stats section for compatibility
if 'stats' not in aggregated_data:
aggregated_data['stats'] = {}
aggregated_data['stats'].update(all_imbalances)
# Add price range information for debugging
aggregated_data['stats']['price_range_used'] = price_range
logger.debug(f"COB imbalances for {symbol} (±${price_range}): {current_imbalance:.4f}")
return aggregated_data
except Exception as e:
logger.error(f"Error calculating COB-based imbalances for {symbol}: {e}")
# Return original data with default imbalances
default_imbalances = {
'imbalance_1s': 0.0, 'imbalance_5s': 0.0, 'imbalance_15s': 0.0, 'imbalance_60s': 0.0,
'volume_imbalance_1s': 0.0, 'volume_imbalance_5s': 0.0, 'volume_imbalance_15s': 0.0, 'volume_imbalance_60s': 0.0
}
aggregated_data.update(default_imbalances)
return aggregated_data
def _get_price_range_for_symbol(self, symbol: str) -> float:
"""Get configurable price range for order book imbalance calculation"""
# Configurable price ranges per symbol
price_ranges = {
'ETH/USDT': 5.0, # $5 range for ETH
'BTC/USDT': 50.0, # $50 range for BTC
}
return price_ranges.get(symbol, 10.0) # Default $10 range for other symbols
def get_current_cob_imbalance(self, symbol: str) -> Dict[str, float]:
"""Get current COB imbalance metrics for a symbol"""
try:
price_range = self._get_price_range_for_symbol(symbol)
latest_cob = self.get_latest_cob_data(symbol)
if not latest_cob:
return {
'imbalance': 0.0,
'price_range': price_range,
'mid_price': 0.0,
'bid_volume_in_range': 0.0,
'ask_volume_in_range': 0.0
}
# Calculate detailed imbalance info
bids = latest_cob.get('bids', [])
asks = latest_cob.get('asks', [])
if not bids or not asks:
return {'imbalance': 0.0, 'price_range': price_range, 'mid_price': 0.0}
# Calculate mid price with proper safety checks
try:
if not bids or not asks or len(bids) == 0 or len(asks) == 0:
return {'imbalance': 0.0, 'price_range': price_range, 'mid_price': 0.0}
best_bid = float(bids[0][0])
best_ask = float(asks[0][0])
mid_price = (best_bid + best_ask) / 2.0
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error calculating mid price for {symbol}: {e}")
return {'imbalance': 0.0, 'price_range': price_range, 'mid_price': 0.0, 'error': str(e)}
# Calculate volumes in range with safety checks
price_min = mid_price - price_range
price_max = mid_price + price_range
bid_volume_in_range = 0.0
ask_volume_in_range = 0.0
try:
for price, vol in bids:
price = float(price)
vol = float(vol)
if price_min <= price <= mid_price:
bid_volume_in_range += vol
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error processing bid volumes for {symbol}: {e}")
try:
for price, vol in asks:
price = float(price)
vol = float(vol)
if mid_price <= price <= price_max:
ask_volume_in_range += vol
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error processing ask volumes for {symbol}: {e}")
total_volume = bid_volume_in_range + ask_volume_in_range
imbalance = (bid_volume_in_range - ask_volume_in_range) / total_volume if total_volume > 0 else 0.0
return {
'imbalance': imbalance,
'price_range': price_range,
'mid_price': mid_price,
'bid_volume_in_range': bid_volume_in_range,
'ask_volume_in_range': ask_volume_in_range,
'total_volume_in_range': total_volume,
'best_bid': best_bid,
'best_ask': best_ask
}
except Exception as e:
logger.error(f"Error getting current COB imbalance for {symbol}: {e}")
return {'imbalance': 0.0, 'price_range': price_range, 'error': str(e)}
def _calculate_cob_imbalance(self, cob_data: Any, price_range: float) -> float:
"""Calculate order book imbalance within specified price range around mid price.
Accepts dict snapshot or COBData-like objects (with bids/asks as list of [price, size]).
"""
try:
# Normalize input
if isinstance(cob_data, dict):
bids = cob_data.get('bids', [])
asks = cob_data.get('asks', [])
else:
# Try attribute access (COBData-like or snapshot objects)
bids = getattr(cob_data, 'bids', []) or []
asks = getattr(cob_data, 'asks', []) or []
if not bids or not asks:
return 0.0
# Calculate mid price with proper safety checks
try:
if not bids or not asks or len(bids) == 0 or len(asks) == 0:
return 0.0
best_bid = float(bids[0][0])
best_ask = float(asks[0][0])
if best_bid <= 0 or best_ask <= 0:
return 0.0
mid_price = (best_bid + best_ask) / 2.0
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error calculating mid price: {e}")
return 0.0
# Define price range around mid price
price_min = mid_price - price_range
price_max = mid_price + price_range
# Sum volumes within price range
bid_volume_in_range = 0.0
ask_volume_in_range = 0.0
# Sum bid volumes within range with safety checks
try:
for bid_price, bid_volume in bids:
bid_price = float(bid_price)
bid_volume = float(bid_volume)
if price_min <= bid_price <= mid_price:
bid_volume_in_range += bid_volume
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error processing bid volumes: {e}")
# Sum ask volumes within range with safety checks
try:
for ask_price, ask_volume in asks:
ask_price = float(ask_price)
ask_volume = float(ask_volume)
if mid_price <= ask_price <= price_max:
ask_volume_in_range += ask_volume
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error processing ask volumes: {e}")
# Calculate imbalance: (bid_volume - ask_volume) / (bid_volume + ask_volume)
total_volume = bid_volume_in_range + ask_volume_in_range
if total_volume > 0:
imbalance = (bid_volume_in_range - ask_volume_in_range) / total_volume
return imbalance
else:
return 0.0
except Exception as e:
logger.error(f"Error calculating COB imbalance: {e}")
return 0.0
def _calculate_timeframe_cob_imbalance(self, historical_cob_data: List[Dict], seconds: int, price_range: float) -> float:
"""Calculate average COB imbalance over specified timeframe"""
try:
if not historical_cob_data or len(historical_cob_data) == 0:
return 0.0
# Get recent data within timeframe (approximate by using last N ticks)
# Assuming ~100 ticks per second, so N = seconds * 100
max_ticks = seconds * 100
recent_ticks = historical_cob_data[-max_ticks:] if len(historical_cob_data) > max_ticks else historical_cob_data
if not recent_ticks:
return 0.0
# Calculate imbalance for each tick and average
imbalances = []
for tick in recent_ticks:
imbalance = self._calculate_cob_imbalance(tick, price_range)
imbalances.append(imbalance)
if imbalances:
return sum(imbalances) / len(imbalances)
else:
return 0.0
except Exception as e:
logger.error(f"Error calculating {seconds}s COB imbalance: {e}")
return 0.0
def _calculate_volume_weighted_imbalance(self, historical_cob_data: List[Dict], seconds: int, price_range: float) -> float:
"""Calculate volume-weighted average imbalance over timeframe"""
try:
if not historical_cob_data:
return 0.0
# Get recent data within timeframe
max_ticks = seconds * 100 # Approximate ticks per second
recent_ticks = historical_cob_data[-max_ticks:] if len(historical_cob_data) > max_ticks else historical_cob_data
if not recent_ticks:
return 0.0
total_weighted_imbalance = 0.0
total_volume = 0.0
for tick in recent_ticks:
imbalance = self._calculate_cob_imbalance(tick, price_range)
# Calculate total volume in range for weighting
bids = tick.get('bids', [])
asks = tick.get('asks', [])
if bids and asks and len(bids) > 0 and len(asks) > 0:
# Get mid price for this tick with proper safety checks
try:
best_bid = float(bids[0][0])
best_ask = float(asks[0][0])
mid_price = (best_bid + best_ask) / 2.0
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Skipping tick due to data format issue: {e}")
continue
# Calculate volume in range
price_min = mid_price - price_range
price_max = mid_price + price_range
tick_volume = 0.0
try:
for bid_price, bid_volume in bids:
bid_price = float(bid_price)
bid_volume = float(bid_volume)
if price_min <= bid_price <= mid_price:
tick_volume += bid_volume
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error processing bid volumes in weighted calculation: {e}")
try:
for ask_price, ask_volume in asks:
ask_price = float(ask_price)
ask_volume = float(ask_volume)
if mid_price <= ask_price <= price_max:
tick_volume += ask_volume
except (IndexError, KeyError, ValueError) as e:
logger.debug(f"Error processing ask volumes in weighted calculation: {e}")
if tick_volume > 0:
total_weighted_imbalance += imbalance * tick_volume
total_volume += tick_volume
if total_volume > 0:
return total_weighted_imbalance / total_volume
else:
return 0.0
except Exception as e:
logger.error(f"Error calculating volume-weighted {seconds}s imbalance: {e}")
return 0.0
def _create_1s_cob_aggregation(self, symbol: str, ticks: List[Dict], timestamp: int) -> Dict:
"""Create 1s aggregation with $1 price buckets"""
try:
if not ticks:
return {}
# Initialize buckets
bid_buckets = {} # {price_bucket: total_volume}
ask_buckets = {} # {price_bucket: total_volume}
# Statistics tracking
all_mid_prices = []
all_spreads = []
all_imbalances = []
total_bid_volume = 0
total_ask_volume = 0
# Process each tick
for tick in ticks:
stats = tick.get('stats', {})
bids = tick.get('bids', [])
asks = tick.get('asks', [])
# Track statistics
mid_price = stats.get('mid_price', 0)
if mid_price > 0:
all_mid_prices.append(mid_price)
spread = stats.get('spread_bps', 0)
if spread > 0:
all_spreads.append(spread)
imbalance = stats.get('imbalance', 0)
all_imbalances.append(imbalance)
# Process bids with $1 buckets
for bid in bids:
if isinstance(bid, dict):
price = bid.get('price', 0)
volume = bid.get('volume', 0)
elif isinstance(bid, list) and len(bid) >= 2:
price = float(bid[0])
volume = float(bid[1])
else:
continue
if price > 0 and volume > 0:
# Create $1 bucket (floor to nearest dollar)
bucket = math.floor(price)
if bucket not in bid_buckets:
bid_buckets[bucket] = 0
bid_buckets[bucket] += volume
total_bid_volume += volume
# Process asks with $1 buckets
for ask in asks:
if isinstance(ask, dict):
price = ask.get('price', 0)
volume = ask.get('volume', 0)
elif isinstance(ask, list) and len(ask) >= 2:
price = float(ask[0])
volume = float(ask[1])
else:
continue
if price > 0 and volume > 0:
# Create $1 bucket (floor to nearest dollar)
bucket = math.floor(price)
if bucket not in ask_buckets:
ask_buckets[bucket] = 0
ask_buckets[bucket] += volume
total_ask_volume += volume
# Calculate aggregated statistics
avg_mid_price = sum(all_mid_prices) / len(all_mid_prices) if all_mid_prices else 0
avg_spread = sum(all_spreads) / len(all_spreads) if all_spreads else 0
avg_imbalance = sum(all_imbalances) / len(all_imbalances) if all_imbalances else 0
# Calculate current imbalance from total volumes
total_volume = total_bid_volume + total_ask_volume
current_imbalance = (total_bid_volume - total_ask_volume) / total_volume if total_volume > 0 else 0
# Create aggregated data structure
aggregated = {
'symbol': symbol,
'timestamp': timestamp,
'tick_count': len(ticks),
'bucket_size_usd': 1.0, # $1 buckets
'bid_buckets': dict(sorted(bid_buckets.items(), reverse=True)[:50]), # Top 50 bid buckets
'ask_buckets': dict(sorted(ask_buckets.items())[:50]), # Top 50 ask buckets
'imbalance': current_imbalance, # Current 1s imbalance
'total_volume': total_volume,
'stats': {
'avg_mid_price': avg_mid_price,
'avg_spread_bps': avg_spread,
'avg_imbalance': avg_imbalance,
'current_imbalance': current_imbalance,
'total_bid_volume': total_bid_volume,
'total_ask_volume': total_ask_volume,
'total_volume': total_volume,
'bid_bucket_count': len(bid_buckets),
'ask_bucket_count': len(ask_buckets),
'price_range_usd': max(max(bid_buckets.keys()) if bid_buckets else 0,
max(ask_buckets.keys()) if ask_buckets else 0) -
min(min(bid_buckets.keys()) if bid_buckets else 0,
min(ask_buckets.keys()) if ask_buckets else 0)
}
}
return aggregated
except Exception as e:
logger.error(f"Error creating 1s COB aggregation: {e}")
return {}
def _ensure_datetime_index(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure dataframe has proper datetime index"""
if df is None or df.empty:
return df
try:
# If we already have a proper DatetimeIndex, return as is
if isinstance(df.index, pd.DatetimeIndex):
return df
# If timestamp column exists, use it as index
if 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
df.set_index('timestamp', inplace=True)
return df
# If we have a RangeIndex or other non-datetime index, create datetime index
if isinstance(df.index, pd.RangeIndex) or not isinstance(df.index, pd.DatetimeIndex):
# Use current UTC time and work backwards for realistic timestamps
from datetime import datetime, timedelta
end_time = datetime.utcnow()
start_time = end_time - timedelta(minutes=len(df))
df.index = pd.date_range(start=start_time, end=end_time, periods=len(df), tz='UTC')
logger.debug(f"Converted RangeIndex to DatetimeIndex for {len(df)} records")
return df
except Exception as e:
logger.warning(f"Error ensuring datetime index: {e}")
return df
def get_price_range_over_period(self, symbol: str, start_time: datetime,
end_time: datetime, timeframe: str = '1m') -> Optional[Dict[str, float]]:
"""Get min/max price and other statistics over a specific time period"""
try:
# Get historical data for the period
data = self.get_historical_data(symbol, timeframe, limit=50000, refresh=False)
if data is None:
return None
# Filter data for the time range
data = data[(data.index >= start_time) & (data.index <= end_time)]
if len(data) == 0:
return None
# Calculate statistics
price_range = {
'min_price': float(data['low'].min()),
'max_price': float(data['high'].max()),
'open_price': float(data.iloc[0]['open']),
'close_price': float(data.iloc[-1]['close']),
'avg_price': float(data['close'].mean()),
'price_volatility': float(data['close'].std()),
'total_volume': float(data['volume'].sum()),
'data_points': len(data),
'time_range_seconds': (end_time - start_time).total_seconds()
}
return price_range
except Exception as e:
logger.error(f"Error getting price range for {symbol}: {e}")
return None
def get_historical_data(self, symbol: str, timeframe: str, limit: int = 1000, refresh: bool = False) -> Optional[pd.DataFrame]:
"""Get historical OHLCV data.
- Prefer cached data for low latency.
- If cache is empty or refresh=True, fetch real data from exchanges.
- Never generate synthetic data.
"""
try:
# Serve from cache when available
if symbol in self.cached_data and timeframe in self.cached_data[symbol]:
cached_df = self.cached_data[symbol][timeframe]
if not cached_df.empty and not refresh:
return cached_df.tail(limit)
# Cache empty or refresh requested: fetch real data now
df = self._fetch_from_binance(symbol, timeframe, limit)
if (df is None or df.empty):
df = self._fetch_from_mexc(symbol, timeframe, limit)
if df is not None and not df.empty:
df = self._ensure_datetime_index(df)
# Store/merge into cache
if symbol not in self.cached_data:
self.cached_data[symbol] = {}
if timeframe not in self.cached_data[symbol] or self.cached_data[symbol][timeframe].empty:
self.cached_data[symbol][timeframe] = df.tail(1500)
else:
combined_df = pd.concat([self.cached_data[symbol][timeframe], df], ignore_index=False)
combined_df = combined_df[~combined_df.index.duplicated(keep='last')]
combined_df = combined_df.sort_index()
self.cached_data[symbol][timeframe] = combined_df.tail(1500)
logger.info(f"Cached {len(self.cached_data[symbol][timeframe])} candles for {symbol} {timeframe}")
return self.cached_data[symbol][timeframe].tail(limit)
logger.warning(f"No real data available for {symbol} {timeframe} at request time")
return None
except Exception as e:
logger.error(f"Error getting historical data for {symbol} {timeframe}: {e}")
return None
def _fetch_from_mexc(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
"""Fetch data from MEXC API (fallback data source when Binance is unavailable)"""
try:
# For 1s timeframe, generate from WebSocket tick data
if timeframe == '1s':
# logger.deta(f"Generating 1s candles from WebSocket ticks for {symbol}")
return self._generate_1s_candles_from_ticks(symbol, limit)
# Convert symbol format
mexc_symbol = symbol.replace('/', '').upper()
# Convert timeframe for MEXC (excluding 1s)
timeframe_map = {
'1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m',
'1h': '1h', '4h': '4h', '1d': '1d'
}
mexc_timeframe = timeframe_map.get(timeframe)
if mexc_timeframe is None:
logger.warning(f"MEXC doesn't support timeframe {timeframe}, skipping {symbol}")
return None
# MEXC API request
url = "https://api.mexc.com/api/v3/klines"
params = {
'symbol': mexc_symbol,
'interval': mexc_timeframe,
'limit': limit
}
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Convert to DataFrame (MEXC uses 8 columns vs Binance's 12)
df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume'
])
# Process columns with proper timezone handling (MEXC returns UTC timestamps)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
# Keep in UTC to match COB WebSocket data (no timezone conversion)
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = df[col].astype(float)
# Keep only OHLCV columns
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
df = df.sort_values('timestamp').reset_index(drop=True)
logger.info(f"MEXC: Fetched {len(df)} candles for {symbol} {timeframe}")
return df
except Exception as e:
logger.error(f"MEXC: Error fetching data: {e}")
return None
def _generate_1s_candles_from_ticks(self, symbol: str, limit: int = 1000) -> Optional[pd.DataFrame]:
"""Generate 1-second OHLCV candles from WebSocket tick data"""
try:
# Get recent ticks from COB data
recent_ticks = self.get_cob_raw_ticks(symbol, count=limit * 10) # Get more ticks than needed
if not recent_ticks:
logger.debug(f"No tick data available for {symbol}, cannot generate 1s candles")
return None
# Group ticks by second and create OHLCV candles
candles = []
current_second = None
current_candle = None
for tick in recent_ticks:
# Extract timestamp and price from tick
if isinstance(tick, dict):
timestamp = tick.get('timestamp')
# Prefer explicit price if available, fallback to stats.mid_price
stats = tick.get('stats', {}) if isinstance(tick.get('stats', {}), dict) else {}
price = tick.get('price')
if not price:
price = tick.get('mid_price') or stats.get('mid_price', 0)
# Strict: if still falsy or non-finite, skip
try:
price = float(price)
except Exception:
price = 0.0
# Volume: do not synthesize from other stats; use provided value or 0.0
volume = tick.get('volume')
try:
volume = float(volume) if volume is not None else 0.0
except Exception:
volume = 0.0
else:
continue
# Normalize timestamp; support seconds or milliseconds since epoch and tz-aware datetimes
if not timestamp or not price or price <= 0:
continue
# Convert timestamp to datetime if needed
if isinstance(timestamp, (int, float)):
import pytz
utc = pytz.UTC
# Handle ms epoch inputs by thresholding reasonable ranges
try:
# If timestamp looks like milliseconds (e.g., > 10^12), convert to seconds
if timestamp > 1e12:
tick_time = datetime.fromtimestamp(timestamp / 1000.0, tz=utc)
else:
tick_time = datetime.fromtimestamp(timestamp, tz=utc)
except Exception:
# Skip bad timestamps cleanly on Windows
continue
# Keep in UTC to match COB WebSocket data
elif isinstance(timestamp, datetime):
import pytz
utc = pytz.UTC
tick_time = timestamp
# If no timezone info, assume UTC and keep in UTC
if tick_time.tzinfo is None:
try:
tick_time = utc.localize(tick_time)
except Exception:
# Fallback: coerce via fromtimestamp using naive seconds
try:
tick_time = datetime.fromtimestamp(tick_time.timestamp(), tz=utc)
except Exception:
continue
# Keep in UTC (no timezone conversion)
else:
continue
# Round to second
tick_second = tick_time.replace(microsecond=0)
# Start new candle if second changed
if current_second != tick_second:
# Save previous candle if exists
if current_candle:
candles.append(current_candle)
# Start new candle
current_second = tick_second
current_candle = {
'timestamp': tick_second,
'open': price,
'high': price,
'low': price,
'close': price,
'volume': volume
}
else:
# Update current candle
current_candle['high'] = max(current_candle['high'], price)
current_candle['low'] = min(current_candle['low'], price)
current_candle['close'] = price
current_candle['volume'] += volume
# Add final candle
if current_candle:
candles.append(current_candle)
if not candles:
logger.debug(f"No valid candles generated for {symbol}")
return None
# Convert to DataFrame and normalize timestamps to UTC tz-aware
df = pd.DataFrame(candles)
if not df.empty and 'timestamp' in df.columns:
# Coerce to datetime with UTC; avoid .dt on non-datetimelike
df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True, errors='coerce')
df = df.dropna(subset=['timestamp'])
df = df.sort_values('timestamp').reset_index(drop=True)
# Limit to requested number
if len(df) > limit:
df = df.tail(limit)
# logger.info(f"Generated {len(df)} 1s candles from {len(recent_ticks)} ticks for {symbol}")
return df
except Exception as e:
# Handle invalid argument or bad timestamps gracefully (Windows-safe)
try:
import errno
if hasattr(e, 'errno') and e.errno == errno.EINVAL:
logger.warning(f"Invalid argument while generating 1s candles for {symbol}; trimming tick buffer and falling back")
else:
logger.error(f"Error generating 1s candles from ticks for {symbol}: {e}")
except Exception:
logger.error(f"Error generating 1s candles from ticks for {symbol}: {e}")
# Always trim a small portion of tick buffer to recover from corrupt front entries
try:
if hasattr(self, 'cob_raw_ticks') and symbol in getattr(self, 'cob_raw_ticks', {}):
buf = self.cob_raw_ticks[symbol]
drop = max(1, min(50, len(buf)//10)) # drop up to 10% or 50 entries
for _ in range(drop):
buf.popleft()
except Exception:
pass
return None
def _fetch_from_binance(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
"""Fetch data from Binance API with robust rate limiting and error handling"""
try:
from .api_rate_limiter import get_rate_limiter
# For 1s timeframe, try to generate from WebSocket ticks first
if timeframe == '1s':
# Attempt to generate from WebSocket ticks, but throttle attempts to avoid spam
if not hasattr(self, '_last_1s_generation_attempt'):
self._last_1s_generation_attempt = {}
now_ts = time.time()
last_attempt = self._last_1s_generation_attempt.get(symbol, 0)
generated_df = None
if now_ts - last_attempt >= 1.5:
self._last_1s_generation_attempt[symbol] = now_ts
generated_df = self._generate_1s_candles_from_ticks(symbol, limit)
if generated_df is not None and not generated_df.empty:
return generated_df
else:
logger.debug(f"Could not generate 1s candles from ticks for {symbol}; trying Binance API")
# Convert symbol format
binance_symbol = symbol.replace('/', '').upper()
# Convert timeframe (now includes 1s support)
timeframe_map = {
'1s': '1s', '1m': '1m', '5m': '5m', '15m': '15m', '30m': '30m',
'1h': '1h', '4h': '4h', '1d': '1d'
}
binance_timeframe = timeframe_map.get(timeframe, '1h')
# Use rate limiter for API requests
rate_limiter = get_rate_limiter()
# Check if we can make request
can_request, wait_time = rate_limiter.can_make_request('binance_api')
if not can_request:
logger.debug(f"Binance rate limited, waiting {wait_time:.1f}s for {symbol} {timeframe}")
if wait_time > 30: # If wait is too long, use fallback
return self._get_fallback_data(symbol, timeframe, limit)
time.sleep(min(wait_time, 5)) # Cap wait at 5 seconds
# API request with rate limiter
url = "https://api.binance.com/api/v3/klines"
params = {
'symbol': binance_symbol,
'interval': binance_timeframe,
'limit': limit
}
response = rate_limiter.make_request('binance_api', url, 'GET', params=params)
if response is None:
logger.warning(f"Binance API request failed for {symbol} {timeframe} - using fallback")
return self._get_fallback_data(symbol, timeframe, limit)
if response.status_code != 200:
logger.warning(f"Binance API returned {response.status_code} for {symbol} {timeframe}")
return self._get_fallback_data(symbol, timeframe, limit)
data = response.json()
# Convert to DataFrame
df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
# Process columns with proper timezone handling (Binance returns UTC timestamps)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
# Keep in UTC to match COB WebSocket data (no timezone conversion)
# This prevents the 3-hour gap when appending live COB data
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = df[col].astype(float)
# Keep only OHLCV columns
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
df = df.sort_values('timestamp').reset_index(drop=True)
logger.info(f"Binance: Fetched {len(df)} candles for {symbol} {timeframe}")
return df
except Exception as e:
if "451" in str(e) or "Client Error" in str(e):
logger.warning(f"Binance API access blocked (451) for {symbol} {timeframe} - using fallback")
return self._get_fallback_data(symbol, timeframe, limit)
else:
logger.error(f"Error fetching from Binance API: {e}")
return self._get_fallback_data(symbol, timeframe, limit)
def _get_fallback_data(self, symbol: str, timeframe: str, limit: int) -> Optional[pd.DataFrame]:
"""Get fallback data when Binance API is unavailable - REAL DATA ONLY"""
try:
logger.info(f"FALLBACK: Attempting to get real cached data for {symbol} {timeframe}")
# For 1s timeframe, try generating from WebSocket ticks first
if timeframe == '1s':
# logger.info(f"FALLBACK: Attempting to generate 1s candles from WebSocket ticks for {symbol}")
generated_data = self._generate_1s_candles_from_ticks(symbol, limit)
if generated_data is not None and not generated_data.empty:
# logger.info(f"FALLBACK: Generated 1s candles from WebSocket ticks for {symbol}: {len(generated_data)} bars")
return generated_data
# ONLY try cached data
cached_data = self._load_from_cache(symbol, timeframe)
if cached_data is not None and not cached_data.empty:
# Limit to requested amount
limited_data = cached_data.tail(limit) if len(cached_data) > limit else cached_data
logger.info(f"FALLBACK: Using cached real data for {symbol} {timeframe}: {len(limited_data)} bars")
return limited_data
# Try MEXC as secondary real data source
mexc_data = self._fetch_from_mexc(symbol, timeframe, limit)
if mexc_data is not None and not mexc_data.empty:
logger.info(f"FALLBACK: Using MEXC real data for {symbol} {timeframe}: {len(mexc_data)} bars")
return mexc_data
# NO SYNTHETIC DATA - Return None if no real data available
logger.warning(f"FALLBACK: No real data available for {symbol} {timeframe} - waiting for real data")
return None
except Exception as e:
logger.error(f"Error getting fallback data: {e}")
return None
def _add_technical_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add comprehensive technical indicators AND pivot-based normalization context"""
try:
df = df.copy()
# Ensure we have enough data for indicators
if len(df) < 50:
logger.warning(f"Insufficient data for comprehensive indicators: {len(df)} rows")
return self._add_basic_indicators(df)
# === EXISTING TECHNICAL INDICATORS ===
# Moving averages (multiple timeframes)
df['sma_10'] = ta.trend.sma_indicator(df['close'], window=10)
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
df['sma_50'] = ta.trend.sma_indicator(df['close'], window=50)
df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12)
df['ema_26'] = ta.trend.ema_indicator(df['close'], window=26)
df['ema_50'] = ta.trend.ema_indicator(df['close'], window=50)
# MACD family
macd = ta.trend.MACD(df['close'])
df['macd'] = macd.macd()
df['macd_signal'] = macd.macd_signal()
df['macd_histogram'] = macd.macd_diff()
# ADX (Average Directional Index)
adx = ta.trend.ADXIndicator(df['high'], df['low'], df['close'])
df['adx'] = adx.adx()
df['adx_pos'] = adx.adx_pos()
df['adx_neg'] = adx.adx_neg()
# Parabolic SAR
psar = ta.trend.PSARIndicator(df['high'], df['low'], df['close'])
df['psar'] = psar.psar()
# === MOMENTUM INDICATORS ===
# RSI (multiple periods) - using our own implementation to avoid ta library deprecation warnings
df['rsi_14'] = self._calculate_rsi(df['close'], period=14)
df['rsi_7'] = self._calculate_rsi(df['close'], period=7)
df['rsi_21'] = self._calculate_rsi(df['close'], period=21)
# Stochastic Oscillator
stoch = ta.momentum.StochasticOscillator(df['high'], df['low'], df['close'])
df['stoch_k'] = stoch.stoch()
df['stoch_d'] = stoch.stoch_signal()
# Williams %R
df['williams_r'] = ta.momentum.williams_r(df['high'], df['low'], df['close'])
# Ultimate Oscillator (instead of CCI which isn't available)
df['ultimate_osc'] = ta.momentum.ultimate_oscillator(df['high'], df['low'], df['close'])
# === VOLATILITY INDICATORS ===
# Bollinger Bands
bollinger = ta.volatility.BollingerBands(df['close'])
df['bb_upper'] = bollinger.bollinger_hband()
df['bb_lower'] = bollinger.bollinger_lband()
df['bb_middle'] = bollinger.bollinger_mavg()
df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle']
df['bb_percent'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
# Average True Range
df['atr'] = ta.volatility.average_true_range(df['high'], df['low'], df['close'])
# Keltner Channels
keltner = ta.volatility.KeltnerChannel(df['high'], df['low'], df['close'])
df['keltner_upper'] = keltner.keltner_channel_hband()
df['keltner_lower'] = keltner.keltner_channel_lband()
df['keltner_middle'] = keltner.keltner_channel_mband()
# === VOLUME INDICATORS ===
# Volume moving averages
df['volume_sma_10'] = df['volume'].rolling(window=10).mean()
df['volume_sma_20'] = df['volume'].rolling(window=20).mean()
df['volume_sma_50'] = df['volume'].rolling(window=50).mean()
# On Balance Volume
df['obv'] = ta.volume.on_balance_volume(df['close'], df['volume'])
# Volume Price Trend
df['vpt'] = ta.volume.volume_price_trend(df['close'], df['volume'])
# Money Flow Index
df['mfi'] = ta.volume.money_flow_index(df['high'], df['low'], df['close'], df['volume'])
# Accumulation/Distribution Line
df['ad_line'] = ta.volume.acc_dist_index(df['high'], df['low'], df['close'], df['volume'])
# Volume Weighted Average Price (VWAP)
df['vwap'] = (df['close'] * df['volume']).cumsum() / df['volume'].cumsum()
# === PRICE ACTION INDICATORS ===
# Price position relative to range
df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
# True Range (use ATR calculation for true range)
df['true_range'] = df['atr'] # ATR is based on true range, so use it directly
# Rate of Change
df['roc'] = ta.momentum.roc(df['close'], window=10)
# === CUSTOM INDICATORS ===
# Trend strength (combination of multiple trend indicators)
df['trend_strength'] = (
(df['close'] > df['sma_20']).astype(int) +
(df['sma_10'] > df['sma_20']).astype(int) +
(df['macd'] > df['macd_signal']).astype(int) +
(df['adx'] > 25).astype(int)
) / 4.0
# Momentum composite
df['momentum_composite'] = (
(df['rsi_14'] / 100) +
((df['stoch_k'] + 50) / 100) + # Normalize stoch_k
((df['williams_r'] + 50) / 100) # Normalize williams_r
) / 3.0
# Volatility regime
df['volatility_regime'] = (df['atr'] / df['close']).rolling(window=20).rank(pct=True)
# === WILLIAMS MARKET STRUCTURE PIVOT CONTEXT ===
# Check if we need to refresh pivot bounds for this symbol
symbol = self._extract_symbol_from_dataframe(df)
if symbol and self._should_refresh_pivot_bounds(symbol):
logger.info(f"Refreshing pivot bounds for {symbol}")
self._refresh_pivot_bounds_for_symbol(symbol)
# Add pivot-based context features
if symbol and symbol in self.pivot_bounds:
df = self._add_pivot_context_features(df, symbol)
# === FILL NaN VALUES ===
# Forward fill first, then backward fill, then zero fill
df = df.ffill().bfill().fillna(0)
logger.debug(f"Added technical indicators + pivot context for {len(df)} rows")
return df
except Exception as e:
logger.error(f"Error adding comprehensive technical indicators: {e}")
# Fallback to basic indicators
return self._add_basic_indicators(df)
# === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM ===
def _collect_monthly_1m_data(self, symbol: str) -> Optional[pd.DataFrame]:
"""Collect 30 days of 1m candles with smart gap-filling cache system"""
try:
# Check for cached data and determine what we need to fetch
cached_data = self._load_monthly_data_from_cache(symbol)
import pytz
utc = pytz.UTC
end_time = datetime.utcnow().replace(tzinfo=utc)
start_time = end_time - timedelta(days=30)
if cached_data is not None and not cached_data.empty:
logger.info(f"Found cached monthly 1m data for {symbol}: {len(cached_data)} candles")
# Check cache data range
cache_start = cached_data['timestamp'].min()
cache_end = cached_data['timestamp'].max()
logger.info(f"Cache range: {cache_start} to {cache_end}")
# Remove data older than 30 days
cached_data = cached_data[cached_data['timestamp'] >= start_time]
# Check if we need to fill gaps
gap_start = cache_end + timedelta(minutes=1)
# Ensure gap_start has same timezone as end_time for comparison
if gap_start.tzinfo is None:
gap_start = SOFIA_TZ.localize(gap_start)
elif gap_start.tzinfo != SOFIA_TZ:
gap_start = gap_start.astimezone(SOFIA_TZ)
if gap_start < end_time:
# Need to fill gap from cache_end to now
logger.info(f"Filling gap from {gap_start} to {end_time}")
gap_data = self._fetch_1m_data_range(symbol, gap_start, end_time)
if gap_data is not None and not gap_data.empty:
# Combine cached data with gap data
monthly_df = pd.concat([cached_data, gap_data], ignore_index=True)
monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
logger.info(f"Combined cache + gap: {len(monthly_df)} total candles")
else:
monthly_df = cached_data
logger.info(f"Using cached data only: {len(monthly_df)} candles")
else:
monthly_df = cached_data
logger.info(f"Cache is up to date: {len(monthly_df)} candles")
else:
# No cache - fetch full 30 days
logger.info(f"No cache found, collecting full 30 days of 1m data for {symbol}")
monthly_df = self._fetch_1m_data_range(symbol, start_time, end_time)
if monthly_df is not None and not monthly_df.empty:
# Final cleanup: ensure exactly 30 days
monthly_df = monthly_df[monthly_df['timestamp'] >= start_time]
monthly_df = monthly_df.sort_values('timestamp').reset_index(drop=True)
logger.info(f"Final dataset: {len(monthly_df)} 1m candles for {symbol}")
# Update cache
self._save_monthly_data_to_cache(symbol, monthly_df)
return monthly_df
else:
logger.error(f"No monthly 1m data collected for {symbol}")
return None
except Exception as e:
logger.error(f"Error collecting monthly 1m data for {symbol}: {e}")
return None
def _fetch_1s_batch_with_endtime(self, symbol: str, end_time: datetime, limit: int = 1000) -> Optional[pd.DataFrame]:
"""Fetch a batch of 1s candles ending at specific time"""
try:
binance_symbol = symbol.replace('/', '').upper()
# Convert end_time to milliseconds
end_ms = int(end_time.timestamp() * 1000)
# API request
url = "https://api.binance.com/api/v3/klines"
params = {
'symbol': binance_symbol,
'interval': '1s',
'endTime': end_ms,
'limit': limit
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
if not data:
return None
# Convert to DataFrame
df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
# Process columns with proper timezone handling
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', utc=True)
# Keep in UTC to match COB WebSocket data (no timezone conversion)
for col in ['open', 'high', 'low', 'close', 'volume']:
df[col] = df[col].astype(float)
# Keep only OHLCV columns
df = df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
return df
except Exception as e:
logger.error(f"Error fetching 1s batch for {symbol}: {e}")
return None
def _fetch_1m_data_range(self, symbol: str, start_time: datetime, end_time: datetime) -> Optional[pd.DataFrame]:
"""Fetch 1m candles for a specific time range with efficient batching"""
try:
# Convert symbol format for Binance API
if '/' in symbol:
api_symbol = symbol.replace('/', '')
else:
api_symbol = symbol
logger.info(f"Fetching 1m data for {symbol} from {start_time} to {end_time}")
all_candles = []
current_start = start_time
batch_size = 1000 # Binance limit
api_calls_made = 0
while current_start < end_time and api_calls_made < 50: # Safety limit for 30 days
try:
# Calculate end time for this batch
batch_end = min(current_start + timedelta(minutes=batch_size), end_time)
# Convert to milliseconds
start_timestamp = int(current_start.timestamp() * 1000)
end_timestamp = int(batch_end.timestamp() * 1000)
# Binance API call
url = "https://api.binance.com/api/v3/klines"
params = {
'symbol': api_symbol,
'interval': '1m',
'startTime': start_timestamp,
'endTime': end_timestamp,
'limit': batch_size
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
api_calls_made += 1
if not data:
logger.warning(f"No data returned for batch {current_start} to {batch_end}")
break
# Convert to DataFrame
batch_df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
# Process columns with proper timezone handling
batch_df['timestamp'] = pd.to_datetime(batch_df['timestamp'], unit='ms', utc=True)
# Keep in UTC to match COB WebSocket data (no timezone conversion)
for col in ['open', 'high', 'low', 'close', 'volume']:
batch_df[col] = batch_df[col].astype(float)
# Keep only OHLCV columns
batch_df = batch_df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
all_candles.append(batch_df)
# Move to next batch (add 1 minute to avoid overlap)
current_start = batch_end + timedelta(minutes=1)
# Rate limiting (Binance allows 1200/min)
time.sleep(0.05) # 50ms delay
# Progress logging
if api_calls_made % 10 == 0:
total_candles = sum(len(df) for df in all_candles)
logger.info(f"Progress: {api_calls_made} API calls, {total_candles} candles collected")
except Exception as e:
logger.error(f"Error in batch {current_start} to {batch_end}: {e}")
current_start += timedelta(minutes=batch_size)
time.sleep(1) # Wait longer on error
continue
if not all_candles:
logger.error(f"No data collected for {symbol}")
return None
# Combine all batches
df = pd.concat(all_candles, ignore_index=True)
df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
logger.info(f"Successfully fetched {len(df)} 1m candles for {symbol} ({api_calls_made} API calls)")
return df
except Exception as e:
logger.error(f"Error fetching 1m data range for {symbol}: {e}")
return None
def _extract_pivot_bounds_from_monthly_data(self, symbol: str, monthly_data: pd.DataFrame) -> Optional[PivotBounds]:
"""Extract pivot bounds using Williams Market Structure analysis"""
try:
logger.info(f"Analyzing {len(monthly_data)} candles for pivot extraction...")
# Convert DataFrame to numpy array format expected by Williams Market Structure
ohlcv_array = monthly_data[['timestamp', 'open', 'high', 'low', 'close', 'volume']].copy()
# Convert timestamp to numeric for Williams analysis
ohlcv_array['timestamp'] = ohlcv_array['timestamp'].astype(np.int64) // 10**9 # Convert to seconds
ohlcv_array = ohlcv_array.to_numpy()
# Initialize Williams Market Structure analyzer
try:
williams = WilliamsMarketStructure(1)
# Calculate 5 levels of recursive pivot points
logger.info("Running Williams Market Structure analysis...")
pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
except ImportError:
logger.warning("Williams Market Structure not available, using simplified pivot detection")
pivot_levels = self._simple_pivot_detection(monthly_data)
# Extract bounds from pivot analysis
bounds = self._extract_bounds_from_pivot_levels(symbol, monthly_data, pivot_levels)
return bounds
except Exception as e:
logger.error(f"Error extracting pivot bounds for {symbol}: {e}")
return None
def _extract_bounds_from_pivot_levels(self, symbol: str, monthly_data: pd.DataFrame,
pivot_levels: Dict[str, Any]) -> PivotBounds:
"""Extract normalization bounds from Williams pivot levels"""
try:
# Initialize bounds
price_max = monthly_data['high'].max()
price_min = monthly_data['low'].min()
volume_max = monthly_data['volume'].max()
volume_min = monthly_data['volume'].min()
support_levels = []
resistance_levels = []
# Extract pivot points from all Williams levels
for level_key, level_data in pivot_levels.items():
if level_data and hasattr(level_data, 'swing_points') and level_data.swing_points:
# Get prices from swing points
level_prices = [sp.price for sp in level_data.swing_points]
# Update overall price bounds
price_max = max(price_max, max(level_prices))
price_min = min(price_min, min(level_prices))
# Extract support and resistance levels
if hasattr(level_data, 'support_levels') and level_data.support_levels:
support_levels.extend(level_data.support_levels)
if hasattr(level_data, 'resistance_levels') and level_data.resistance_levels:
resistance_levels.extend(level_data.resistance_levels)
# Remove duplicates and sort
support_levels = sorted(list(set(support_levels)))
resistance_levels = sorted(list(set(resistance_levels)))
# Extract trend context from pivot levels
pivot_context = {
'nested_levels': len(pivot_levels),
'level_details': {}
}
# Get trend info from primary level (level_0)
if 'level_0' in pivot_levels and pivot_levels['level_0']:
level_0 = pivot_levels['level_0']
pivot_context['trend_direction'] = getattr(level_0, 'trend_direction', 'UNKNOWN')
pivot_context['trend_strength'] = getattr(level_0, 'trend_strength', 0.0)
else:
pivot_context['trend_direction'] = 'UNKNOWN'
pivot_context['trend_strength'] = 0.0
# Add details for each level
for level_key, level_data in pivot_levels.items():
if level_data:
level_info = {
'swing_points_count': len(getattr(level_data, 'swing_points', [])),
'support_levels_count': len(getattr(level_data, 'support_levels', [])),
'resistance_levels_count': len(getattr(level_data, 'resistance_levels', [])),
'trend_direction': getattr(level_data, 'trend_direction', 'UNKNOWN'),
'trend_strength': getattr(level_data, 'trend_strength', 0.0)
}
pivot_context['level_details'][level_key] = level_info
# Create PivotBounds object
bounds = PivotBounds(
symbol=symbol,
price_max=float(price_max),
price_min=float(price_min),
volume_max=float(volume_max),
volume_min=float(volume_min),
pivot_support_levels=support_levels,
pivot_resistance_levels=resistance_levels,
data_period_start=monthly_data['timestamp'].min(),
data_period_end=monthly_data['timestamp'].max(),
total_candles_analyzed=len(monthly_data)
)
logger.info(f"Extracted pivot bounds for {symbol}:")
logger.info(f" Price range: ${bounds.price_min:.2f} - ${bounds.price_max:.2f}")
logger.info(f" Volume range: {bounds.volume_min:.2f} - {bounds.volume_max:.2f}")
logger.info(f" Support levels: {len(bounds.pivot_support_levels)}")
logger.info(f" Resistance levels: {len(bounds.pivot_resistance_levels)}")
return bounds
except Exception as e:
logger.error(f"Error extracting bounds from pivot levels: {e}")
# Fallback to simple min/max bounds
return PivotBounds(
symbol=symbol,
price_max=float(monthly_data['high'].max()),
price_min=float(monthly_data['low'].min()),
volume_max=float(monthly_data['volume'].max()),
volume_min=float(monthly_data['volume'].min()),
pivot_support_levels=[],
pivot_resistance_levels=[],
pivot_context={},
created_timestamp=datetime.now(),
data_period_start=monthly_data['timestamp'].min(),
data_period_end=monthly_data['timestamp'].max(),
total_candles_analyzed=len(monthly_data)
)
def _simple_pivot_detection(self, monthly_data: pd.DataFrame) -> Dict[str, Any]:
"""Simple pivot detection fallback when Williams Market Structure is not available"""
try:
# Simple high/low pivot detection using rolling windows
highs = monthly_data['high']
lows = monthly_data['low']
# Find local maxima and minima using different windows
pivot_highs = []
pivot_lows = []
for window in [5, 10, 20, 50]:
if len(monthly_data) > window * 2:
# Rolling max/min detection
rolling_max = highs.rolling(window=window, center=True).max()
rolling_min = lows.rolling(window=window, center=True).min()
# Find pivot highs (local maxima)
high_pivots = monthly_data[highs == rolling_max]['high'].tolist()
pivot_highs.extend(high_pivots)
# Find pivot lows (local minima)
low_pivots = monthly_data[lows == rolling_min]['low'].tolist()
pivot_lows.extend(low_pivots)
# Create proper pivot level structure
pivot_level = SimplePivotLevel(
swing_points=[],
support_levels=list(set(pivot_lows)),
resistance_levels=list(set(pivot_highs))
)
return {'level_0': pivot_level}
except Exception as e:
logger.error(f"Error in simple pivot detection: {e}")
return {}
def _should_refresh_pivot_bounds(self, symbol: str) -> bool:
"""Check if pivot bounds need refreshing"""
try:
if symbol not in self.pivot_bounds:
return True
bounds = self.pivot_bounds[symbol]
age = datetime.now() - bounds.created_timestamp
return age > self.pivot_refresh_interval
except Exception as e:
logger.error(f"Error checking pivot bounds refresh: {e}")
return True
def _refresh_pivot_bounds_for_symbol(self, symbol: str):
"""Refresh pivot bounds for a specific symbol"""
try:
# Collect monthly 1m data
monthly_data = self._collect_monthly_1m_data(symbol)
if monthly_data is None or monthly_data.empty:
logger.warning(f"Could not collect monthly data for {symbol}")
return
# Extract pivot bounds
bounds = self._extract_pivot_bounds_from_monthly_data(symbol, monthly_data)
if bounds is None:
logger.warning(f"Could not extract pivot bounds for {symbol}")
return
# Store bounds
self.pivot_bounds[symbol] = bounds
# Save to cache
self._save_pivot_bounds_to_cache(symbol, bounds)
logger.info(f"Successfully refreshed pivot bounds for {symbol}")
except Exception as e:
logger.error(f"Error refreshing pivot bounds for {symbol}: {e}")
def _add_pivot_context_features(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame:
"""Add pivot-derived context features for normalization"""
try:
if symbol not in self.pivot_bounds:
logger.warning("Pivot bounds missing for %s; access will be blocked until real data is ready (guideline: no stubs)", symbol)
return df
bounds = self.pivot_bounds[symbol]
current_prices = df['close']
# Distance to nearest support/resistance levels (normalized)
df['pivot_support_distance'] = current_prices.apply(bounds.get_nearest_support_distance)
df['pivot_resistance_distance'] = current_prices.apply(bounds.get_nearest_resistance_distance)
# Price position within pivot range (0 = price_min, 1 = price_max)
df['pivot_price_position'] = current_prices.apply(bounds.normalize_price).clip(0, 1)
# Add binary features for proximity to key levels
price_range = bounds.get_price_range()
proximity_threshold = price_range * 0.02 # 2% of price range
df['near_pivot_support'] = 0
df['near_pivot_resistance'] = 0
for price in current_prices:
# Check if near any support level
if any(abs(price - s) <= proximity_threshold for s in bounds.pivot_support_levels):
df.loc[df['close'] == price, 'near_pivot_support'] = 1
# Check if near any resistance level
if any(abs(price - r) <= proximity_threshold for r in bounds.pivot_resistance_levels):
df.loc[df['close'] == price, 'near_pivot_resistance'] = 1
logger.debug(f"Added pivot context features for {symbol}")
return df
except Exception as e:
logger.warning(f"Error adding pivot context features for {symbol}: {e}")
return df
def _extract_symbol_from_dataframe(self, df: pd.DataFrame) -> Optional[str]:
"""Extract symbol from dataframe context (basic implementation)"""
# This is a simple implementation - in a real system, you might pass symbol explicitly
# or store it as metadata in the dataframe
for symbol in self.symbols:
# Check if this dataframe might belong to this symbol based on current processing
return symbol # Return first symbol for now - can be improved
return None
# === CACHE MANAGEMENT ===
def _auto_fix_corrupted_cache(self):
"""Automatically fix corrupted cache files on startup"""
try:
from utils.cache_manager import get_cache_manager
cache_manager = get_cache_manager()
# Quick health check
health_summary = cache_manager.get_cache_summary()
if health_summary['corrupted_files'] > 0:
logger.warning(f"Found {health_summary['corrupted_files']} corrupted cache files, cleaning up...")
# Auto-cleanup corrupted files (no confirmation needed)
deleted_files = cache_manager.cleanup_corrupted_files(dry_run=False)
deleted_count = 0
for cache_dir, files in deleted_files.items():
for file_info in files:
if "DELETED:" in file_info:
deleted_count += 1
logger.info(f"Auto-cleaned {deleted_count} corrupted cache files")
else:
logger.debug("Cache health check passed - no corrupted files found")
except Exception as e:
logger.warning(f"Cache auto-fix failed: {e}")
# === PIVOT BOUNDS CACHING ===
def _load_all_pivot_bounds(self):
"""Load all cached pivot bounds on startup"""
try:
for symbol in self.symbols:
bounds = self._load_pivot_bounds_from_cache(symbol)
if bounds:
self.pivot_bounds[symbol] = bounds
logger.info(f"Loaded cached pivot bounds for {symbol}")
except Exception as e:
logger.error(f"Error loading pivot bounds from cache: {e}")
def _load_pivot_bounds_from_cache(self, symbol: str) -> Optional[PivotBounds]:
"""Load pivot bounds from cache"""
try:
cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl"
if cache_file.exists():
with open(cache_file, 'rb') as f:
bounds = pickle.load(f)
# Check if bounds are still valid (not too old)
age = datetime.now() - bounds.created_timestamp
if age <= self.pivot_refresh_interval:
return bounds
else:
logger.info(f"Cached pivot bounds for {symbol} are too old ({age.days} days)")
return None
except Exception as e:
logger.warning(f"Error loading pivot bounds from cache for {symbol}: {e}")
return None
def _save_pivot_bounds_to_cache(self, symbol: str, bounds: PivotBounds):
"""Save pivot bounds to cache"""
try:
cache_file = self.pivot_cache_dir / f"{symbol.replace('/', '')}_pivot_bounds.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(bounds, f)
logger.debug(f"Saved pivot bounds to cache for {symbol}")
except Exception as e:
logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}")
def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]:
"""Load monthly 1m data from cache"""
try:
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet"
if cache_file.exists():
try:
df = pd.read_parquet(cache_file)
# Ensure cached monthly data has proper timezone (UTC to match COB WebSocket data)
if not df.empty and 'timestamp' in df.columns:
if df['timestamp'].dt.tz is None:
# If no timezone info, assume UTC and keep in UTC
df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
elif str(df['timestamp'].dt.tz) != 'UTC':
# Convert to UTC if different timezone
df['timestamp'] = df['timestamp'].dt.tz_convert('UTC')
logger.info(f"Loaded {len(df)} 1m candles from cache for {symbol}")
return df
except Exception as parquet_e:
# Handle corrupted Parquet file - expanded error detection
error_str = str(parquet_e).lower()
corrupted_indicators = [
"parquet magic bytes not found",
"corrupted",
"couldn't deserialize thrift",
"don't know what type",
"invalid parquet file",
"unexpected end of file",
"invalid metadata"
]
if any(indicator in error_str for indicator in corrupted_indicators):
logger.warning(f"Corrupted Parquet cache file for {symbol}, removing and returning None: {parquet_e}")
try:
cache_file.unlink() # Delete corrupted file
logger.info(f"Deleted corrupted monthly cache file: {cache_file}")
except Exception as delete_e:
logger.error(f"Failed to delete corrupted monthly cache file: {delete_e}")
return None
else:
raise parquet_e
return None
except Exception as e:
logger.warning(f"Error loading monthly data from cache for {symbol}: {e}")
return None
def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame):
"""Save monthly 1m data to cache"""
try:
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet"
df.to_parquet(cache_file, index=False)
logger.info(f"Saved {len(df)} monthly 1m candles to cache for {symbol}")
except Exception as e:
logger.warning(f"Error saving monthly data to cache for {symbol}: {e}")
def get_pivot_bounds(self, symbol: str) -> Optional[PivotBounds]:
"""Get pivot bounds for a symbol"""
return self.pivot_bounds.get(symbol)
def get_pivot_normalized_features(self, symbol: str, df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""Get dataframe with pivot-normalized features"""
try:
if symbol not in self.pivot_bounds:
logger.warning(f"No pivot bounds available for {symbol}")
return df
bounds = self.pivot_bounds[symbol]
normalized_df = df.copy()
# Normalize price columns using pivot bounds
price_range = bounds.get_price_range()
for col in ['open', 'high', 'low', 'close']:
if col in normalized_df.columns:
normalized_df[col] = (normalized_df[col] - bounds.price_min) / price_range
# Normalize volume using pivot bounds
volume_range = bounds.volume_max - bounds.volume_min
if volume_range > 0 and 'volume' in normalized_df.columns:
normalized_df['volume'] = (normalized_df['volume'] - bounds.volume_min) / volume_range
return normalized_df
except Exception as e:
logger.error(f"Error applying pivot normalization for {symbol}: {e}")
return df
def build_base_data_input(self, symbol: str) -> Optional['BaseDataInput']:
"""
Build BaseDataInput from cached data (optimized for speed)
Args:
symbol: Trading symbol
Returns:
BaseDataInput with consistent data structure
"""
try:
from .data_models import BaseDataInput
# Get OHLCV data directly from optimized cache (no validation checks for speed)
ohlcv_1s_list = self._get_cached_ohlcv_bars(symbol, '1s', 300)
ohlcv_1m_list = self._get_cached_ohlcv_bars(symbol, '1m', 300)
ohlcv_1h_list = self._get_cached_ohlcv_bars(symbol, '1h', 300)
ohlcv_1d_list = self._get_cached_ohlcv_bars(symbol, '1d', 300)
# Get BTC reference data
btc_symbol = 'BTC/USDT'
btc_ohlcv_1s_list = self._get_cached_ohlcv_bars(btc_symbol, '1s', 300)
if not btc_ohlcv_1s_list:
# Use ETH data as fallback
btc_ohlcv_1s_list = ohlcv_1s_list
# Get cached data (fast lookups)
technical_indicators = self._get_latest_technical_indicators(symbol)
cob_data = self._get_latest_cob_data_object(symbol)
last_predictions = {}
# Build BaseDataInput (no validation for speed - assume data is good)
base_data = BaseDataInput(
symbol=symbol,
timestamp=datetime.now(),
ohlcv_1s=ohlcv_1s_list,
ohlcv_1m=ohlcv_1m_list,
ohlcv_1h=ohlcv_1h_list,
ohlcv_1d=ohlcv_1d_list,
btc_ohlcv_1s=btc_ohlcv_1s_list,
technical_indicators=technical_indicators,
cob_data=cob_data,
last_predictions=last_predictions
)
return base_data
except Exception as e:
logger.error(f"Error building BaseDataInput for {symbol}: {e}")
return None
def _get_cached_ohlcv_bars(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']:
"""Get OHLCV data list from cached data"""
try:
from .data_models import OHLCVBar
data_list = []
# Get cached data
if symbol in self.cached_data and timeframe in self.cached_data[symbol]:
cached_df = self.cached_data[symbol][timeframe]
if not cached_df.empty:
# Convert cached data to OHLCVBar objects
for idx, row in cached_df.tail(max_count).iterrows():
bar = OHLCVBar(
symbol=symbol,
timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(),
open=float(row['open']),
high=float(row['high']),
low=float(row['low']),
close=float(row['close']),
volume=float(row['volume']),
timeframe=timeframe
)
data_list.append(bar)
return data_list
except Exception as e:
logger.error(f"Error getting cached OHLCV bars for {symbol}/{timeframe}: {e}")
return []
def _get_latest_technical_indicators(self, symbol: str) -> Dict[str, float]:
"""Get latest technical indicators for a symbol"""
try:
# Get latest data and calculate indicators
df = self.get_historical_data(symbol, '1h', limit=50)
if df is not None and not df.empty:
df_with_indicators = self._add_technical_indicators(df)
if not df_with_indicators.empty:
# Return the latest indicators as a dict
latest_row = df_with_indicators.iloc[-1]
indicators = {}
for col in df_with_indicators.columns:
if col not in ['open', 'high', 'low', 'close', 'volume', 'timestamp']:
indicators[col] = float(latest_row[col]) if pd.notna(latest_row[col]) else 0.0
return indicators
return {}
except Exception as e:
logger.error(f"Error getting technical indicators for {symbol}: {e}")
return {}
def _get_latest_cob_data_object(self, symbol: str) -> Optional['COBData']:
"""Get latest COB data as COBData object"""
try:
from .data_models import COBData
# Get latest COB data from cache
cob_data = self.get_latest_cob_data(symbol)
if cob_data:
# Determine current price (prefer explicit field, fallback to stats.mid_price)
stats = cob_data.get('stats', {}) if isinstance(cob_data.get('stats', {}), dict) else {}
current_price = cob_data.get('current_price') or stats.get('mid_price', 0.0)
bucket_size = 1.0 if 'ETH' in symbol else 10.0
# Ensure price buckets exist; compute from bids/asks if missing
price_buckets = cob_data.get('price_buckets') or {}
if (not price_buckets) and current_price:
price_buckets = self._compute_price_buckets_from_snapshot(
current_price=current_price,
bucket_size=bucket_size,
bids=cob_data.get('bids', []),
asks=cob_data.get('asks', [])
)
# Build imbalance map (price -> imbalance) if not provided
bid_ask_imbalance = cob_data.get('bid_ask_imbalance') or {}
if not bid_ask_imbalance and price_buckets:
tmp = {}
for price, bucket in price_buckets.items():
bid_vol = float(bucket.get('bid_volume', 0.0) or 0.0)
ask_vol = float(bucket.get('ask_volume', 0.0) or 0.0)
denom = bid_vol + ask_vol
tmp[price] = (bid_vol - ask_vol) / denom if denom > 0 else 0.0
bid_ask_imbalance = tmp
return COBData(
symbol=symbol,
timestamp=datetime.now(),
current_price=float(current_price or 0.0),
bucket_size=bucket_size,
price_buckets=price_buckets,
bid_ask_imbalance=bid_ask_imbalance,
volume_weighted_prices=cob_data.get('volume_weighted_prices', {}),
order_flow_metrics=cob_data.get('order_flow_metrics', {}),
ma_1s_imbalance=cob_data.get('ma_1s_imbalance', {}),
ma_5s_imbalance=cob_data.get('ma_5s_imbalance', {}),
ma_15s_imbalance=cob_data.get('ma_15s_imbalance', {}),
ma_60s_imbalance=cob_data.get('ma_60s_imbalance', {})
)
return None
except Exception as e:
logger.error(f"Error getting COB data object for {symbol}: {e}")
return None
def _compute_price_buckets_from_snapshot(
self,
current_price: float,
bucket_size: float,
bids: List[List[float]],
asks: List[List[float]]
) -> Dict[float, Dict[str, float]]:
"""Compute ±20 price buckets around current price from raw bids/asks.
Returns dict: price -> {bid_volume, ask_volume, total_volume, imbalance}
"""
try:
# Initialize bucket map for ±20 buckets
bucket_map: Dict[float, Dict[str, float]] = {}
if not current_price or bucket_size <= 0:
return bucket_map
# Center-aligned bucket prices
bucket_count = 20
for i in range(-bucket_count, bucket_count + 1):
price = (round(current_price / bucket_size) * bucket_size) + (i * bucket_size)
bucket_map[price] = {
'bid_volume': 0.0,
'ask_volume': 0.0,
'total_volume': 0.0,
'imbalance': 0.0,
}
# Aggregate bids
for level in (bids or [])[:200]:
try:
price, size = float(level[0]), float(level[1])
except Exception:
continue
bucket_price = round(price / bucket_size) * bucket_size
if bucket_price in bucket_map:
bucket_map[bucket_price]['bid_volume'] += size
# Aggregate asks
for level in (asks or [])[:200]:
try:
price, size = float(level[0]), float(level[1])
except Exception:
continue
bucket_price = round(price / bucket_size) * bucket_size
if bucket_price in bucket_map:
bucket_map[bucket_price]['ask_volume'] += size
# Compute totals and imbalance
for price, bucket in bucket_map.items():
bid_vol = float(bucket['bid_volume'])
ask_vol = float(bucket['ask_volume'])
total = bid_vol + ask_vol
bucket['total_volume'] = total
bucket['imbalance'] = (bid_vol - ask_vol) / total if total > 0 else 0.0
return bucket_map
except Exception as e:
logger.debug(f"Error computing price buckets: {e}")
return {}
def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add basic indicators for small datasets"""
try:
df = df.copy()
# Basic moving averages
if len(df) >= 20:
df['sma_20'] = ta.trend.sma_indicator(df['close'], window=20)
df['ema_12'] = ta.trend.ema_indicator(df['close'], window=12)
# Basic RSI - using our own implementation to avoid ta library deprecation warnings
if len(df) >= 14:
df['rsi_14'] = self._calculate_rsi(df['close'], period=14)
# Basic volume indicators
if len(df) >= 10:
df['volume_sma_10'] = df['volume'].rolling(window=10).mean()
# Basic price action
df['price_position'] = (df['close'] - df['low']) / (df['high'] - df['low'])
df['price_position'] = df['price_position'].fillna(0.5) # Default to middle
# Fill NaN values
df = df.ffill().bfill().fillna(0)
return df
except Exception as e:
logger.error(f"Error adding basic indicators: {e}")
return df
def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> float:
"""Calculate RSI (Relative Strength Index) - custom implementation to avoid ta library deprecation warnings"""
try:
if len(prices) < period + 1:
return 50.0 # Default neutral value
# Calculate price changes
delta = prices.diff()
# Separate gains and losses
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
# Calculate RS and RSI
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
# Return the last value, or 50 if NaN
last_rsi = rsi.iloc[-1]
return float(last_rsi) if not pd.isna(last_rsi) else 50.0
except Exception as e:
logger.debug(f"Error calculating RSI: {e}")
return 50.0 # Default neutral value
def _load_from_cache(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
"""Load data from cache"""
try:
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
if cache_file.exists():
# Check if cache is recent - stricter rules for startup
cache_age = time.time() - cache_file.stat().st_mtime
# For 1m data, use cache only if less than 5 minutes old to avoid gaps
if timeframe == '1m':
max_age = 300 # 5 minutes
else:
max_age = 3600 # 1 hour for other timeframes
if cache_age < max_age:
try:
df = pd.read_parquet(cache_file)
# Ensure cached data has proper timezone (UTC to match COB WebSocket data)
if not df.empty and 'timestamp' in df.columns:
if df['timestamp'].dt.tz is None:
# If no timezone info, assume UTC and keep in UTC
df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
elif str(df['timestamp'].dt.tz) != 'UTC':
# Convert to UTC if different timezone
df['timestamp'] = df['timestamp'].dt.tz_convert('UTC')
logger.debug(f"Loaded {len(df)} rows from cache for {symbol} {timeframe} (age: {cache_age/60:.1f}min)")
return df
except Exception as parquet_e:
# Handle corrupted Parquet file - expanded error detection
error_str = str(parquet_e).lower()
corrupted_indicators = [
"parquet magic bytes not found",
"corrupted",
"couldn't deserialize thrift",
"don't know what type",
"invalid parquet file",
"unexpected end of file",
"invalid metadata"
]
if any(indicator in error_str for indicator in corrupted_indicators):
logger.warning(f"Corrupted Parquet cache file for {symbol} {timeframe}, removing and returning None: {parquet_e}")
try:
cache_file.unlink() # Delete corrupted file
logger.info(f"Deleted corrupted cache file: {cache_file}")
except Exception as delete_e:
logger.error(f"Failed to delete corrupted cache file: {delete_e}")
return None
else:
raise parquet_e
else:
logger.debug(f"Cache for {symbol} {timeframe} is too old ({cache_age/60:.1f}min > {max_age/60:.1f}min)")
return None
except Exception as e:
logger.warning(f"Error loading cache for {symbol} {timeframe}: {e}")
return None
def _save_to_cache(self, df: pd.DataFrame, symbol: str, timeframe: str):
"""Save data to cache"""
try:
cache_file = self.cache_dir / f"{symbol.replace('/', '')}_{timeframe}.parquet"
df.to_parquet(cache_file, index=False)
logger.debug(f"Saved {len(df)} rows to cache for {symbol} {timeframe}")
except Exception as e:
logger.warning(f"Error saving cache for {symbol} {timeframe}: {e}")
async def start_real_time_streaming(self):
"""Start real-time data streaming using COBIntegration"""
if self.is_streaming:
logger.warning("Real-time streaming already active")
return
self.is_streaming = True
logger.info("Starting real-time streaming via COBIntegration")
# COBIntegration is started in the constructor
async def stop_real_time_streaming(self):
"""Stop real-time data streaming"""
if not self.is_streaming:
return
logger.info("Stopping Enhanced COB WebSocket streaming")
self.is_streaming = False
# Stop COB Integration
if self.cob_integration:
try:
await self.cob_integration.stop()
logger.info("COB Integration stopped")
except Exception as e:
logger.error(f"Error stopping COB Integration: {e}")
# Stop Enhanced COB WebSocket
if self.enhanced_cob_websocket:
try:
await self.enhanced_cob_websocket.stop()
self.enhanced_cob_websocket = None
logger.info("Enhanced COB WebSocket stopped")
except Exception as e:
logger.error(f"Error stopping Enhanced COB WebSocket: {e}")
# Cancel any remaining WebSocket tasks
for symbol, task in self.websocket_tasks.items():
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self.websocket_tasks.clear()
# === COB DATA ACCESS METHODS ===
def get_cob_raw_ticks(self, symbol: str, count: int = 1000) -> List[Dict]:
"""Get raw COB ticks for a symbol (up to 15 minutes of data)"""
try:
if symbol in self.cob_raw_ticks:
return list(self.cob_raw_ticks[symbol])[-count:]
return []
except Exception as e:
logger.error(f"Error getting COB raw ticks for {symbol}: {e}")
return []
def get_cob_1s_aggregated(self, symbol: str, count: int = 300) -> List[Dict]:
"""Get 1s aggregated COB data with $1 price buckets"""
try:
if symbol in self.cob_1s_aggregated:
return list(self.cob_1s_aggregated[symbol])[-count:]
return []
except Exception as e:
logger.error(f"Error getting COB 1s aggregated for {symbol}: {e}")
return []
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
"""Get latest COB raw tick for a symbol"""
try:
if symbol in self.cob_raw_ticks and self.cob_raw_ticks[symbol]:
return self.cob_raw_ticks[symbol][-1]
return None
except Exception as e:
logger.error(f"Error getting latest COB data for {symbol}: {e}")
return None
def get_latest_cob_aggregated(self, symbol: str) -> Optional[Dict]:
"""Get latest 1s aggregated COB data for a symbol"""
try:
if symbol in self.cob_1s_aggregated and self.cob_1s_aggregated[symbol]:
return self.cob_1s_aggregated[symbol][-1]
return None
except Exception as e:
logger.error(f"Error getting latest COB aggregated for {symbol}: {e}")
return None
def subscribe_to_cob_raw_ticks(self, callback: Callable[[str, Dict], None]) -> str:
"""Subscribe to raw COB tick updates"""
subscriber_id = str(uuid.uuid4())
self.cob_data_callbacks.append(callback)
logger.info(f"COB raw tick subscriber added: {subscriber_id}")
return subscriber_id
def subscribe_to_cob_aggregated(self, callback: Callable[[str, Dict], None]) -> str:
"""Subscribe to 1s aggregated COB updates"""
subscriber_id = str(uuid.uuid4())
self.cob_aggregated_callbacks.append(callback)
logger.info(f"COB aggregated subscriber added: {subscriber_id}")
return subscriber_id
def get_cob_price_buckets(self, symbol: str, timeframe_seconds: int = 60) -> Dict:
"""Get price bucket analysis for a timeframe"""
try:
# Get aggregated data for the timeframe
count = min(timeframe_seconds, 900) # Max 15 minutes
aggregated_data = self.get_cob_1s_aggregated(symbol, count)
if not aggregated_data:
return {}
# Combine buckets across the timeframe
combined_bid_buckets = {}
combined_ask_buckets = {}
for data in aggregated_data:
bid_buckets = data.get('bid_buckets', {})
ask_buckets = data.get('ask_buckets', {})
for bucket, volume in bid_buckets.items():
if bucket not in combined_bid_buckets:
combined_bid_buckets[bucket] = 0
combined_bid_buckets[bucket] += volume
for bucket, volume in ask_buckets.items():
if bucket not in combined_ask_buckets:
combined_ask_buckets[bucket] = 0
combined_ask_buckets[bucket] += volume
return {
'symbol': symbol,
'timeframe_seconds': timeframe_seconds,
'bucket_size_usd': 1.0,
'bid_buckets': dict(sorted(combined_bid_buckets.items(), reverse=True)),
'ask_buckets': dict(sorted(combined_ask_buckets.items())),
'total_bid_volume': sum(combined_bid_buckets.values()),
'total_ask_volume': sum(combined_ask_buckets.values()),
'bucket_count': len(combined_bid_buckets) + len(combined_ask_buckets)
}
except Exception as e:
logger.error(f"Error getting COB price buckets for {symbol}: {e}")
return {}
def get_cob_websocket_status(self) -> Dict[str, Any]:
"""Get COB WebSocket status"""
try:
if self.cob_integration:
return {
'status': 'active',
'symbols': self.symbols,
'websocket_status': self.cob_integration.get_websocket_status(),
'raw_tick_counts': {symbol: len(self.cob_raw_ticks[symbol]) for symbol in self.symbols},
'aggregated_counts': {symbol: len(self.cob_1s_aggregated[symbol]) for symbol in self.symbols}
}
else:
return {
'status': 'inactive',
'symbols': self.symbols,
'error': 'COB integration not initialized'
}
except Exception as e:
logger.error(f"Error getting COB WebSocket status: {e}")
return {
'status': 'error',
'error': str(e)
}
async def _start_fallback_websocket_streaming(self):
"""Fallback to old WebSocket method if Enhanced COB WebSocket fails"""
try:
logger.warning("⚠️ Starting fallback WebSocket streaming")
# Start old WebSocket for each symbol
for symbol in self.symbols:
task = asyncio.create_task(self._websocket_stream(symbol))
self.websocket_tasks[symbol] = task
except Exception as e:
logger.error(f"❌ Error starting fallback WebSocket: {e}")
def get_cob_websocket_status(self) -> Dict[str, Any]:
"""Get COB WebSocket status for dashboard"""
try:
if self.enhanced_cob_websocket:
return self.enhanced_cob_websocket.get_status_summary()
else:
return {
'overall_status': 'not_initialized',
'symbols': {},
'websockets_available': False
}
except Exception as e:
logger.error(f"Error getting COB WebSocket status: {e}")
return {
'overall_status': 'error',
'symbols': {},
'error': str(e)
}
def get_latest_cob_data(self, symbol: str) -> Optional[Dict]:
"""Get latest COB data from Enhanced WebSocket"""
try:
# First try the websocket data cache
if symbol in self.cob_websocket_data and self.cob_websocket_data[symbol]:
return self.cob_websocket_data[symbol]
# Fallback to raw ticks
if symbol in self.cob_raw_ticks and len(self.cob_raw_ticks[symbol]) > 0:
return self.cob_raw_ticks[symbol][-1] # Get latest raw tick
# No COB data available
return None
except Exception as e:
logger.error(f"Error getting latest COB data for {symbol}: {e}")
return None
async def _websocket_stream(self, symbol: str):
"""WebSocket stream for a single symbol using trade stream for better granularity"""
binance_symbol = symbol.replace('/', '').upper()
url = f"wss://stream.binance.com:9443/ws/{binance_symbol.lower()}@trade"
while self.is_streaming:
try:
logger.info(f"Connecting to WebSocket for {symbol}: {url}")
async with websockets.connect(url) as websocket:
logger.info(f"WebSocket connected for {symbol}")
async for message in websocket:
if not self.is_streaming:
break
try:
await self._process_trade_message(binance_symbol, message)
except Exception as e:
logger.warning(f"Error processing trade message for {symbol}: {e}")
except Exception as e:
logger.error(f"WebSocket error for {symbol}: {e}")
self.distribution_stats['distribution_errors'] += 1
if self.is_streaming:
logger.info(f"Reconnecting WebSocket for {symbol} in 5 seconds...")
await asyncio.sleep(5)
async def _process_trade_message(self, symbol: str, message: str):
"""Process incoming trade message and distribute to subscribers"""
try:
trade_data = json.loads(message)
# Extract trade information
price = float(trade_data.get('p', 0))
quantity = float(trade_data.get('q', 0))
timestamp = datetime.fromtimestamp(int(trade_data.get('T', 0)) / 1000)
is_buyer_maker = trade_data.get('m', False)
trade_id = trade_data.get('t', '')
# Calculate volume in USDT
volume_usdt = price * quantity
# Data validation
if not self._validate_tick_data(symbol, price, volume_usdt):
logger.warning(f"Invalid tick data for {symbol}: price={price}, volume={volume_usdt}")
return
# Process raw tick through aggregator
side = 'sell' if is_buyer_maker else 'buy'
raw_tick, completed_bar = self.tick_aggregator.process_tick(
symbol=symbol,
timestamp=timestamp,
price=price,
volume=volume_usdt,
quantity=quantity,
side=side,
trade_id=str(trade_id)
)
# Update statistics
self.distribution_stats['total_ticks_received'] += 1
self.distribution_stats['ticks_per_symbol'][symbol] += 1
self.distribution_stats['last_tick_time'][symbol] = timestamp
self.last_prices[symbol] = price
if raw_tick:
self.distribution_stats['raw_ticks_processed'] += 1
# Notify raw tick callbacks
for callback in self.raw_tick_callbacks:
try:
callback(raw_tick)
except Exception as e:
logger.error(f"Error in raw tick callback: {e}")
if completed_bar:
self.distribution_stats['ohlcv_bars_created'] += 1
# Notify OHLCV bar callbacks
for callback in self.ohlcv_bar_callbacks:
try:
callback(completed_bar)
except Exception as e:
logger.error(f"Error in OHLCV bar callback: {e}")
# Create standardized tick for legacy compatibility
tick = MarketTick(
symbol=symbol,
timestamp=timestamp,
price=price,
volume=volume_usdt,
quantity=quantity,
side=side,
trade_id=str(trade_id),
is_buyer_maker=is_buyer_maker,
raw_data=trade_data
)
# Add to buffer
self.tick_buffers[symbol].append(tick)
# Update current prices and candles
await self._process_tick(symbol, tick)
# Distribute to all subscribers
self._distribute_tick(tick)
except Exception as e:
logger.error(f"Error processing trade message for {symbol}: {e}")
async def _process_tick(self, symbol: str, tick: MarketTick):
"""Process a single tick and update candles"""
try:
# Update current price
with self.data_lock:
self.current_prices[symbol] = tick.price
# Initialize real-time data structure if needed
if symbol not in self.real_time_data:
self.real_time_data[symbol] = {}
for tf in self.timeframes:
self.real_time_data[symbol][tf] = deque(maxlen=1000)
# Create tick record for candle updates
tick_record = {
'timestamp': tick.timestamp,
'price': tick.price,
'volume': tick.volume
}
# Update all timeframes
for timeframe in self.timeframes:
self._update_candle(symbol, timeframe, tick_record)
except Exception as e:
logger.error(f"Error processing tick for {symbol}: {e}")
def _update_candle(self, symbol: str, timeframe: str, tick: Dict):
"""Update candle for specific timeframe"""
try:
timeframe_secs = self.timeframe_seconds.get(timeframe, 3600)
current_time = tick['timestamp']
# Calculate candle start time using proper datetime truncation
if isinstance(current_time, datetime):
timestamp_seconds = current_time.timestamp()
else:
timestamp_seconds = current_time.timestamp() if hasattr(current_time, 'timestamp') else current_time
# Truncate to timeframe boundary
candle_start_seconds = int(timestamp_seconds // timeframe_secs) * timeframe_secs
candle_start = datetime.fromtimestamp(candle_start_seconds)
# Get current candle queue
candle_queue = self.real_time_data[symbol][timeframe]
# Check if we need a new candle
if not candle_queue or candle_queue[-1]['timestamp'] != candle_start:
# Create new candle
new_candle = {
'timestamp': candle_start,
'open': tick['price'],
'high': tick['price'],
'low': tick['price'],
'close': tick['price'],
'volume': tick['volume']
}
candle_queue.append(new_candle)
else:
# Update existing candle
current_candle = candle_queue[-1]
current_candle['high'] = max(current_candle['high'], tick['price'])
current_candle['low'] = min(current_candle['low'], tick['price'])
current_candle['close'] = tick['price']
current_candle['volume'] += tick['volume']
except Exception as e:
logger.error(f"Error updating candle for {symbol} {timeframe}: {e}")
def get_latest_candles(self, symbol: str, timeframe: str, limit: int = 100) -> pd.DataFrame:
"""Get the latest candles from cached data only"""
try:
# Get cached data
cached_df = self.get_historical_data(symbol, timeframe, limit=limit)
# Get real-time data if available
with self.data_lock:
if symbol in self.real_time_data and timeframe in self.real_time_data[symbol]:
real_time_candles = list(self.real_time_data[symbol][timeframe])
if real_time_candles:
# Convert to DataFrame
rt_df = pd.DataFrame(real_time_candles)
if cached_df is not None and not cached_df.empty:
# Combine cached and real-time
# Remove overlapping candles from cached data
if not rt_df.empty:
cutoff_time = rt_df['timestamp'].min()
cached_df = cached_df[cached_df.index < cutoff_time]
# Concatenate
combined_df = pd.concat([cached_df, rt_df], ignore_index=True)
else:
combined_df = rt_df
return combined_df.tail(limit)
# Return just cached data if no real-time data
return cached_df.tail(limit) if cached_df is not None else pd.DataFrame()
except Exception as e:
logger.error(f"Error getting latest candles for {symbol} {timeframe}: {e}")
return pd.DataFrame()
def get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price for a symbol from cached data"""
try:
# Try to get from 1s candle first (most recent)
for tf in ['1s', '1m', '1h', '1d']:
if symbol in self.cached_data and tf in self.cached_data[symbol]:
df = self.cached_data[symbol][tf]
if not df.empty:
return float(df.iloc[-1]['close'])
logger.warning(f"No cached price data available for {symbol}")
return None
except Exception as e:
logger.error(f"Error getting current price for {symbol}: {e}")
return None
def calculate_williams_pivot_points(self, symbol: str, force_recalculate: bool = False) -> Dict[int, TrendLevel]:
"""
Calculate Williams Market Structure pivot points for a symbol
Args:
symbol: Trading symbol (e.g., 'ETH/USDT')
force_recalculate: Force recalculation even if cache is fresh
Returns:
Dictionary of trend levels with pivot points
"""
try:
# Check if we need to recalculate
now = datetime.now()
if (not force_recalculate and
symbol in self.last_pivot_calculation and
now - self.last_pivot_calculation[symbol] < self.pivot_calculation_interval):
# Return cached results
return self.pivot_points_cache.get(symbol, {})
# Get 1s OHLCV data for Williams Market Structure calculation
df_1s = self.get_historical_data(symbol, '1s', limit=1000)
if df_1s is None or len(df_1s) < 50:
logger.warning(f"Insufficient 1s data for Williams pivot calculation: {symbol}")
return {}
# Convert DataFrame to numpy array for Williams calculation
# Format: [timestamp_ms, open, high, low, close, volume]
ohlcv_array = np.column_stack([
df_1s.index.astype(np.int64) // 10**6, # Convert to milliseconds
df_1s['open'].values,
df_1s['high'].values,
df_1s['low'].values,
df_1s['close'].values,
df_1s['volume'].values
])
# Calculate recursive pivot points using Williams Market Structure
williams = self.williams_structure[symbol]
pivot_levels = williams.calculate_recursive_pivot_points(ohlcv_array)
# Cache the results
self.pivot_points_cache[symbol] = pivot_levels
self.last_pivot_calculation[symbol] = now
logger.debug(f"Calculated Williams pivot points for {symbol}: {len(pivot_levels)} levels")
return pivot_levels
except Exception as e:
logger.error(f"Error calculating Williams pivot points for {symbol}: {e}")
return {}
def get_pivot_features_for_ml(self, symbol: str) -> np.ndarray:
"""
Get pivot point features for machine learning models
Returns a 250-element feature vector containing:
- Recent pivot points (price, strength, type) for each level
- Trend direction and strength for each level
- Time since last pivot for each level
"""
try:
# Ensure we have fresh pivot points
pivot_levels = self.calculate_williams_pivot_points(symbol)
if not pivot_levels:
logger.warning(f"No pivot points available for {symbol}")
return np.zeros(250, dtype=np.float32)
# Use Williams Market Structure to extract ML features
williams = self.williams_structure[symbol]
features = williams.get_pivot_features_for_ml(symbol)
return features
except Exception as e:
logger.error(f"Error getting pivot features for ML: {e}")
return np.zeros(250, dtype=np.float32)
def get_market_structure_summary(self, symbol: str) -> Dict[str, Any]:
"""
Get current market structure summary for dashboard display
Returns:
Dictionary containing market structure information
"""
try:
# Ensure we have fresh pivot points
pivot_levels = self.calculate_williams_pivot_points(symbol)
if not pivot_levels:
return {
'symbol': symbol,
'levels': {},
'overall_trend': 'sideways',
'overall_strength': 0.0,
'last_update': datetime.now().isoformat(),
'error': 'No pivot points available'
}
# Use Williams Market Structure to get summary
williams = self.williams_structure[symbol]
structure = williams.get_current_market_structure()
structure['symbol'] = symbol
return structure
except Exception as e:
logger.error(f"Error getting market structure summary for {symbol}: {e}")
return {
'symbol': symbol,
'levels': {},
'overall_trend': 'sideways',
'overall_strength': 0.0,
'last_update': datetime.now().isoformat(),
'error': str(e)
}
def get_recent_pivot_points(self, symbol: str, level: int = 1, count: int = 10) -> List[PivotPoint]:
"""
Get recent pivot points for a specific level
Args:
symbol: Trading symbol
level: Pivot level (1-5)
count: Number of recent pivots to return
Returns:
List of recent pivot points
"""
try:
pivot_levels = self.calculate_williams_pivot_points(symbol)
if level not in pivot_levels:
return []
trend_level = pivot_levels[level]
recent_pivots = trend_level.pivot_points[-count:] if len(trend_level.pivot_points) >= count else trend_level.pivot_points
return recent_pivots
except Exception as e:
logger.error(f"Error getting recent pivot points for {symbol} level {level}: {e}")
return []
def get_price_at_index(self, symbol: str, index: int, timeframe: str = '1m') -> Optional[float]:
"""Get price at specific index for backtesting from cached data"""
try:
if symbol in self.cached_data and timeframe in self.cached_data[symbol]:
df = self.cached_data[symbol][timeframe]
if not df.empty and 0 <= index < len(df):
return float(df.iloc[index]['close'])
return None
except Exception as e:
logger.error(f"Error getting price at index {index}: {e}")
return None
def get_feature_matrix(self, symbol: str, timeframes: List[str] = None,
window_size: int = 20) -> Optional[np.ndarray]:
"""
Get comprehensive feature matrix for multiple timeframes with technical indicators
Returns:
np.ndarray: Shape (n_timeframes, window_size, n_features)
Each timeframe becomes a separate channel for CNN
"""
try:
if timeframes is None:
timeframes = self.timeframes
feature_channels = []
common_feature_names = None
# First pass: determine common features across all timeframes
timeframe_features = {}
for tf in timeframes:
logger.debug(f"Processing timeframe {tf} for {symbol}")
# Use cached data directly
if symbol in self.cached_data and tf in self.cached_data[symbol]:
df = self.cached_data[symbol][tf]
if not df.empty and len(df) >= window_size:
df = df.tail(window_size + 100) # Get enough data for indicators
else:
logger.warning(f"Insufficient cached data for {symbol} {tf}: {len(df) if not df.empty else 0} rows")
continue
else:
logger.warning(f"No cached data for {symbol} {tf}")
continue
# Get feature columns
basic_cols = ['open', 'high', 'low', 'close', 'volume']
indicator_cols = [col for col in df.columns
if col not in basic_cols + ['timestamp'] and not col.startswith('unnamed')]
selected_features = self._select_cnn_features(df, basic_cols, indicator_cols)
timeframe_features[tf] = (df, selected_features)
if common_feature_names is None:
common_feature_names = set(selected_features)
else:
common_feature_names = common_feature_names.intersection(set(selected_features))
if not common_feature_names:
logger.error(f"No common features found across timeframes for {symbol}")
return None
# Convert to sorted list for consistent ordering
common_feature_names = sorted(list(common_feature_names))
# logger.info(f"Using {len(common_feature_names)} common features: {common_feature_names}")
# Second pass: create feature channels with common features
for tf in timeframes:
if tf not in timeframe_features:
continue
df, _ = timeframe_features[tf]
# Use only common features
try:
tf_features = self._normalize_features(df[common_feature_names].tail(window_size), symbol=symbol)
if tf_features is not None and len(tf_features) == window_size:
feature_channels.append(tf_features.values)
logger.debug(f"Added {len(common_feature_names)} features for {tf}")
else:
logger.warning(f"Feature normalization failed for {tf}")
except Exception as e:
logger.error(f"Error processing features for {tf}: {e}")
continue
if not feature_channels:
logger.error(f"No valid feature channels created for {symbol}")
return None
# Verify all channels have the same shape
shapes = [channel.shape for channel in feature_channels]
if len(set(shapes)) > 1:
logger.error(f"Shape mismatch in feature channels: {shapes}")
return None
# Stack all timeframe channels
feature_matrix = np.stack(feature_channels, axis=0)
logger.debug(f"Created feature matrix for {symbol}: {feature_matrix.shape} "
f"({len(feature_channels)} timeframes, {window_size} steps, {len(common_feature_names)} features)")
return feature_matrix
except Exception as e:
logger.error(f"Error creating feature matrix for {symbol}: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _select_cnn_features(self, df: pd.DataFrame, basic_cols: List[str], indicator_cols: List[str]) -> List[str]:
"""Select the most important features for CNN training"""
try:
selected = []
# Always include basic OHLCV (normalized)
selected.extend(basic_cols)
# Priority indicators (most informative for CNNs)
priority_indicators = [
# Trend indicators
'sma_10', 'sma_20', 'sma_50', 'ema_12', 'ema_26', 'ema_50',
'macd', 'macd_signal', 'macd_histogram',
'adx', 'adx_pos', 'adx_neg', 'psar',
# Momentum indicators
'rsi_14', 'rsi_7', 'rsi_21',
'stoch_k', 'stoch_d', 'williams_r', 'ultimate_osc',
# Volatility indicators
'bb_upper', 'bb_lower', 'bb_middle', 'bb_width', 'bb_percent',
'atr', 'keltner_upper', 'keltner_lower', 'keltner_middle',
# Volume indicators
'volume_sma_10', 'volume_sma_20', 'obv', 'vpt', 'mfi', 'ad_line', 'vwap',
# Price action
'price_position', 'true_range', 'roc',
# Custom composites
'trend_strength', 'momentum_composite', 'volatility_regime'
]
# Add available priority indicators
for indicator in priority_indicators:
if indicator in indicator_cols:
selected.append(indicator)
# Add any other technical indicators not in priority list (limit to avoid curse of dimensionality)
remaining_indicators = [col for col in indicator_cols if col not in selected]
if remaining_indicators:
# Limit to 10 additional indicators
selected.extend(remaining_indicators[:10])
# Verify all selected features exist in dataframe
final_selected = [col for col in selected if col in df.columns]
logger.debug(f"Selected {len(final_selected)} features from {len(df.columns)} available columns")
return final_selected
except Exception as e:
logger.error(f"Error selecting CNN features: {e}")
return basic_cols # Fallback to basic OHLCV
def _normalize_features(self, df: pd.DataFrame, symbol: str = None) -> Optional[pd.DataFrame]:
"""Normalize features for CNN training using unified normalization across all timeframes"""
try:
df_norm = df.copy()
# Get unified normalization bounds for all timeframes
if symbol and symbol in self.pivot_bounds:
bounds = self.pivot_bounds[symbol]
price_range = bounds.get_price_range()
volume_range = bounds.volume_max - bounds.volume_min
logger.debug(f"Using unified pivot-based normalization for {symbol} (price_range: {price_range:.2f})")
else:
df_norm = df_norm.fillna(0)
# Ensure all values are in reasonable range for neural networks
df_norm = np.clip(df_norm, -10, 10)
return df_norm
except Exception as e:
logger.error(f"Error in unified feature normalization: {e}")
return None
return df_norm
except Exception as e:
logger.error(f"Error in symbol-grouped normalization for {symbol}: {e}")
return df
def get_historical_data_for_inference(self, symbol: str, timeframe: str, limit: int = 300) -> Optional[pd.DataFrame]:
"""Get normalized historical data specifically for model inference"""
try:
# Get raw historical data
raw_df = self.get_historical_data(symbol, timeframe, limit)
if raw_df is None or raw_df.empty:
return None
# Apply normalization for inference
normalized_df = self._normalize_features(raw_df, symbol)
logger.debug(f"Retrieved normalized historical data for inference: {symbol} {timeframe} ({len(normalized_df)} records)")
return normalized_df
except Exception as e:
logger.error(f"Error getting normalized historical data for inference: {symbol} {timeframe}: {e}")
return None
def get_multi_symbol_features_for_inference(self, symbols_timeframes: List[Tuple[str, str]], limit: int = 300) -> Dict[str, Dict[str, pd.DataFrame]]:
"""Get normalized multi-symbol feature matrices for model inference"""
try:
logger.info("Preparing normalized multi-symbol features for model inference...")
symbol_features = {}
for symbol, timeframe in symbols_timeframes:
if symbol not in symbol_features:
symbol_features[symbol] = {}
# Get normalized data for inference
normalized_df = self.get_historical_data_for_inference(symbol, timeframe, limit)
if normalized_df is not None and not normalized_df.empty:
symbol_features[symbol][timeframe] = normalized_df
logger.debug(f"Prepared normalized features for {symbol} {timeframe}: {len(normalized_df)} records")
else:
logger.warning(f"No normalized data available for {symbol} {timeframe}")
symbol_features[symbol][timeframe] = None
return symbol_features
except Exception as e:
logger.error(f"Error creating multi-symbol feature matrix: {e}")
return None
def get_cnn_features_for_inference(self, symbol: str, timeframe: str = '1m', window_size: int = 60) -> Optional[np.ndarray]:
"""Get normalized CNN features for a specific symbol and timeframe"""
try:
# Get normalized data
df = self.get_historical_data_for_inference(symbol, timeframe, limit=300)
if df is None or df.empty:
return None
# Extract recent window for CNN
recent_data = df.tail(window_size)
# Extract OHLCV features
features = recent_data[['open', 'high', 'low', 'close', 'volume']].values
logger.debug(f"Extracted CNN features for {symbol} {timeframe}: {features.shape}")
return features.flatten()
except Exception as e:
logger.error(f"Error extracting CNN features for {symbol} {timeframe}: {e}")
return None
def get_dqn_state_for_inference(self, symbols_timeframes: List[Tuple[str, str]], target_size: int = 100) -> Optional[np.ndarray]:
"""Get normalized DQN state vector combining multiple symbols and timeframes"""
try:
state_components = []
for symbol, timeframe in symbols_timeframes:
df = self.get_historical_data_for_inference(symbol, timeframe, limit=50)
if df is not None and not df.empty:
# Extract key features for state
latest = df.iloc[-1]
state_features = [
latest['close'], # Current price (normalized)
latest['volume'], # Current volume (normalized)
df['close'].pct_change().iloc[-1] if len(df) > 1 else 0, # Price change
]
state_components.extend(state_features)
if state_components:
# Pad or truncate to expected DQN state size
if len(state_components) < target_size:
state_components.extend([0] * (target_size - len(state_components)))
else:
state_components = state_components[:target_size]
state_vector = np.array(state_components, dtype=np.float32)
logger.debug(f"Created DQN state vector: {len(state_vector)} dimensions")
return state_vector
return None
except Exception as e:
logger.error(f"Error creating DQN state for inference: {e}")
return None
def get_transformer_sequences_for_inference(self, symbols_timeframes: List[Tuple[str, str]], seq_length: int = 150) -> List[np.ndarray]:
"""Get normalized sequences for transformer inference"""
try:
sequences = []
for symbol, timeframe in symbols_timeframes:
df = self.get_historical_data_for_inference(symbol, timeframe, limit=300)
if df is not None and not df.empty:
# Use last seq_length points as sequence
sequence = df.tail(seq_length)[['open', 'high', 'low', 'close', 'volume']].values
sequences.append(sequence)
logger.debug(f"Created transformer sequence for {symbol} {timeframe}: {sequence.shape}")
return sequences
except Exception as e:
logger.error(f"Error creating transformer sequences for inference: {e}")
return []