cleanup and removed dummy data

This commit is contained in:
Dobromir Popov
2025-07-26 23:35:14 +03:00
parent 3eb6335169
commit 87942d3807
14 changed files with 220 additions and 2465 deletions

View File

@ -1,190 +0,0 @@
"""
Simplified Data Cache System
Replaces complex FIFO queues with a simple current state cache.
Supports unordered updates and extensible data types.
"""
import threading
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from collections import defaultdict
import pandas as pd
import numpy as np
logger = logging.getLogger(__name__)
@dataclass
class DataCacheEntry:
"""Single cache entry with metadata"""
data: Any
timestamp: datetime
source: str = "unknown"
version: int = 1
class DataCache:
"""
Simplified data cache that stores only the latest data for each type.
Thread-safe and supports unordered updates from multiple sources.
"""
def __init__(self):
self.cache: Dict[str, Dict[str, DataCacheEntry]] = defaultdict(dict) # {data_type: {symbol: entry}}
self.locks: Dict[str, threading.RLock] = defaultdict(threading.RLock) # Per data_type locks
self.update_callbacks: Dict[str, List[Callable]] = defaultdict(list) # Update notifications
# Historical data storage (loaded once)
self.historical_data: Dict[str, Dict[str, pd.DataFrame]] = defaultdict(dict) # {symbol: {timeframe: df}}
self.historical_locks: Dict[str, threading.RLock] = defaultdict(threading.RLock)
logger.info("DataCache initialized with simplified architecture")
def update(self, data_type: str, symbol: str, data: Any, source: str = "unknown") -> bool:
"""
Update cache with latest data (thread-safe, unordered updates supported)
Args:
data_type: Type of data ('ohlcv_1s', 'technical_indicators', etc.)
symbol: Trading symbol
data: New data to store
source: Source of the update
Returns:
bool: True if updated successfully
"""
try:
with self.locks[data_type]:
# Create or update entry
old_entry = self.cache[data_type].get(symbol)
new_version = (old_entry.version + 1) if old_entry else 1
self.cache[data_type][symbol] = DataCacheEntry(
data=data,
timestamp=datetime.now(),
source=source,
version=new_version
)
# Notify callbacks
for callback in self.update_callbacks[data_type]:
try:
callback(symbol, data, source)
except Exception as e:
logger.error(f"Error in update callback: {e}")
return True
except Exception as e:
logger.error(f"Error updating cache {data_type}/{symbol}: {e}")
return False
def get(self, data_type: str, symbol: str) -> Optional[Any]:
"""Get latest data for a type/symbol"""
try:
with self.locks[data_type]:
entry = self.cache[data_type].get(symbol)
return entry.data if entry else None
except Exception as e:
logger.error(f"Error getting cache {data_type}/{symbol}: {e}")
return None
def get_with_metadata(self, data_type: str, symbol: str) -> Optional[DataCacheEntry]:
"""Get latest data with metadata"""
try:
with self.locks[data_type]:
return self.cache[data_type].get(symbol)
except Exception as e:
logger.error(f"Error getting cache metadata {data_type}/{symbol}: {e}")
return None
def get_all(self, data_type: str) -> Dict[str, Any]:
"""Get all data for a data type"""
try:
with self.locks[data_type]:
return {symbol: entry.data for symbol, entry in self.cache[data_type].items()}
except Exception as e:
logger.error(f"Error getting all cache data for {data_type}: {e}")
return {}
def has_data(self, data_type: str, symbol: str, max_age_seconds: int = None) -> bool:
"""Check if we have recent data"""
try:
with self.locks[data_type]:
entry = self.cache[data_type].get(symbol)
if not entry:
return False
if max_age_seconds:
age = (datetime.now() - entry.timestamp).total_seconds()
return age <= max_age_seconds
return True
except Exception as e:
logger.error(f"Error checking cache data {data_type}/{symbol}: {e}")
return False
def register_callback(self, data_type: str, callback: Callable[[str, Any, str], None]):
"""Register callback for data updates"""
self.update_callbacks[data_type].append(callback)
def get_status(self) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Get cache status for monitoring"""
status = {}
for data_type in self.cache:
with self.locks[data_type]:
status[data_type] = {}
for symbol, entry in self.cache[data_type].items():
age_seconds = (datetime.now() - entry.timestamp).total_seconds()
status[data_type][symbol] = {
'timestamp': entry.timestamp.isoformat(),
'age_seconds': age_seconds,
'source': entry.source,
'version': entry.version,
'has_data': entry.data is not None
}
return status
# Historical data management
def store_historical_data(self, symbol: str, timeframe: str, df: pd.DataFrame):
"""Store historical data (loaded once at startup)"""
try:
with self.historical_locks[symbol]:
self.historical_data[symbol][timeframe] = df.copy()
logger.info(f"Stored {len(df)} historical bars for {symbol} {timeframe}")
except Exception as e:
logger.error(f"Error storing historical data {symbol}/{timeframe}: {e}")
def get_historical_data(self, symbol: str, timeframe: str) -> Optional[pd.DataFrame]:
"""Get historical data"""
try:
with self.historical_locks[symbol]:
return self.historical_data[symbol].get(timeframe)
except Exception as e:
logger.error(f"Error getting historical data {symbol}/{timeframe}: {e}")
return None
def has_historical_data(self, symbol: str, timeframe: str, min_bars: int = 100) -> bool:
"""Check if we have sufficient historical data"""
try:
with self.historical_locks[symbol]:
df = self.historical_data[symbol].get(timeframe)
return df is not None and len(df) >= min_bars
except Exception:
return False
# Global cache instance
_data_cache_instance = None
def get_data_cache() -> DataCache:
"""Get the global data cache instance"""
global _data_cache_instance
if _data_cache_instance is None:
_data_cache_instance = DataCache()
return _data_cache_instance

View File

@ -114,42 +114,32 @@ class BaseDataInput:
FIXED_FEATURE_SIZE = 7850
features = []
# OHLCV features for ETH (300 frames x 4 timeframes x 5 features = 6000 features)
# OHLCV features for ETH (up to 300 frames x 4 timeframes x 5 features)
for ohlcv_list in [self.ohlcv_1s, self.ohlcv_1m, self.ohlcv_1h, self.ohlcv_1d]:
# Ensure exactly 300 frames by padding or truncating
# Use actual data only, up to 300 frames
ohlcv_frames = ohlcv_list[-300:] if len(ohlcv_list) >= 300 else ohlcv_list
# Pad with zeros if not enough data
while len(ohlcv_frames) < 300:
# Create a dummy OHLCV bar with zeros
dummy_bar = OHLCVBar(
symbol="ETH/USDT",
timestamp=datetime.now(),
open=0.0, high=0.0, low=0.0, close=0.0, volume=0.0,
timeframe="1s"
)
ohlcv_frames.insert(0, dummy_bar)
# Extract features from exactly 300 frames
# Extract features from actual frames
for bar in ohlcv_frames:
features.extend([bar.open, bar.high, bar.low, bar.close, bar.volume])
# Pad with zeros only if we have some data but less than 300 frames
frames_needed = 300 - len(ohlcv_frames)
if frames_needed > 0:
features.extend([0.0] * (frames_needed * 5)) # 5 features per frame
# BTC OHLCV features (300 frames x 5 features = 1500 features)
# BTC OHLCV features (up to 300 frames x 5 features = 1500 features)
btc_frames = self.btc_ohlcv_1s[-300:] if len(self.btc_ohlcv_1s) >= 300 else self.btc_ohlcv_1s
# Pad BTC data if needed
while len(btc_frames) < 300:
dummy_bar = OHLCVBar(
symbol="BTC/USDT",
timestamp=datetime.now(),
open=0.0, high=0.0, low=0.0, close=0.0, volume=0.0,
timeframe="1s"
)
btc_frames.insert(0, dummy_bar)
# Extract features from actual BTC frames
for bar in btc_frames:
features.extend([bar.open, bar.high, bar.low, bar.close, bar.volume])
# Pad with zeros only if we have some data but less than 300 frames
btc_frames_needed = 300 - len(btc_frames)
if btc_frames_needed > 0:
features.extend([0.0] * (btc_frames_needed * 5)) # 5 features per frame
# COB features (FIXED SIZE: 200 features)
cob_features = []
if self.cob_data:

View File

@ -224,6 +224,12 @@ class DataProvider:
self.cob_data_cache[binance_symbol] = deque(maxlen=300) # 5 minutes of COB data
self.training_data_cache[binance_symbol] = deque(maxlen=1000) # Training data buffer
# Pre-built OHLCV cache for instant BaseDataInput building (optimization from SimplifiedDataIntegration)
self._ohlcv_cache = {} # {symbol: {timeframe: List[OHLCVBar]}}
self._ohlcv_cache_lock = Lock()
self._last_cache_update = {} # {symbol: {timeframe: datetime}}
self._cache_refresh_interval = 5 # seconds
# Data collection threads
self.data_collection_active = False
@ -1387,6 +1393,175 @@ class DataProvider:
logger.error(f"Error applying pivot normalization for {symbol}: {e}")
return df
def build_base_data_input(self, symbol: str) -> Optional['BaseDataInput']:
"""
Build BaseDataInput from cached data (optimized for speed)
Args:
symbol: Trading symbol
Returns:
BaseDataInput with consistent data structure
"""
try:
from .data_models import BaseDataInput
# Get OHLCV data directly from optimized cache (no validation checks for speed)
ohlcv_1s_list = self._get_cached_ohlcv_bars(symbol, '1s', 300)
ohlcv_1m_list = self._get_cached_ohlcv_bars(symbol, '1m', 300)
ohlcv_1h_list = self._get_cached_ohlcv_bars(symbol, '1h', 300)
ohlcv_1d_list = self._get_cached_ohlcv_bars(symbol, '1d', 300)
# Get BTC reference data
btc_symbol = 'BTC/USDT'
btc_ohlcv_1s_list = self._get_cached_ohlcv_bars(btc_symbol, '1s', 300)
if not btc_ohlcv_1s_list:
# Use ETH data as fallback
btc_ohlcv_1s_list = ohlcv_1s_list
# Get cached data (fast lookups)
technical_indicators = self._get_latest_technical_indicators(symbol)
cob_data = self._get_latest_cob_data_object(symbol)
last_predictions = {} # TODO: Implement model prediction caching
# Build BaseDataInput (no validation for speed - assume data is good)
base_data = BaseDataInput(
symbol=symbol,
timestamp=datetime.now(),
ohlcv_1s=ohlcv_1s_list,
ohlcv_1m=ohlcv_1m_list,
ohlcv_1h=ohlcv_1h_list,
ohlcv_1d=ohlcv_1d_list,
btc_ohlcv_1s=btc_ohlcv_1s_list,
technical_indicators=technical_indicators,
cob_data=cob_data,
last_predictions=last_predictions
)
return base_data
except Exception as e:
logger.error(f"Error building BaseDataInput for {symbol}: {e}")
return None
def _get_cached_ohlcv_bars(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']:
"""Get OHLCV data list from pre-built cache for instant access"""
try:
with self._ohlcv_cache_lock:
cache_key = f"{symbol}_{timeframe}"
# Check if we have fresh cached data (updated within last 5 seconds)
last_update = self._last_cache_update.get(cache_key)
if (last_update and
(datetime.now() - last_update).total_seconds() < self._cache_refresh_interval and
cache_key in self._ohlcv_cache):
cached_data = self._ohlcv_cache[cache_key]
return cached_data[-max_count:] if len(cached_data) >= max_count else cached_data
# Need to rebuild cache for this symbol/timeframe
data_list = self._build_ohlcv_bar_cache(symbol, timeframe, max_count)
# Cache the result
self._ohlcv_cache[cache_key] = data_list
self._last_cache_update[cache_key] = datetime.now()
return data_list[-max_count:] if len(data_list) >= max_count else data_list
except Exception as e:
logger.error(f"Error getting cached OHLCV bars for {symbol}/{timeframe}: {e}")
return []
def _build_ohlcv_bar_cache(self, symbol: str, timeframe: str, max_count: int) -> List['OHLCVBar']:
"""Build OHLCV bar cache from historical and current data"""
try:
from .data_models import OHLCVBar
data_list = []
# Get historical data first (this should be fast as it's already cached)
historical_df = self.get_historical_data(symbol, timeframe, limit=max_count)
if historical_df is not None and not historical_df.empty:
# Convert historical data to OHLCVBar objects
for idx, row in historical_df.tail(max_count).iterrows():
bar = OHLCVBar(
symbol=symbol,
timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(),
open=float(row['open']),
high=float(row['high']),
low=float(row['low']),
close=float(row['close']),
volume=float(row['volume']),
timeframe=timeframe
)
data_list.append(bar)
return data_list
except Exception as e:
logger.error(f"Error building OHLCV bar cache for {symbol}/{timeframe}: {e}")
return []
def _get_latest_technical_indicators(self, symbol: str) -> Dict[str, float]:
"""Get latest technical indicators for a symbol"""
try:
# Get latest data and calculate indicators
df = self.get_historical_data(symbol, '1h', limit=50)
if df is not None and not df.empty:
df_with_indicators = self._add_technical_indicators(df)
if not df_with_indicators.empty:
# Return the latest indicators as a dict
latest_row = df_with_indicators.iloc[-1]
indicators = {}
for col in df_with_indicators.columns:
if col not in ['open', 'high', 'low', 'close', 'volume', 'timestamp']:
indicators[col] = float(latest_row[col]) if pd.notna(latest_row[col]) else 0.0
return indicators
return {}
except Exception as e:
logger.error(f"Error getting technical indicators for {symbol}: {e}")
return {}
def _get_latest_cob_data_object(self, symbol: str) -> Optional['COBData']:
"""Get latest COB data as COBData object"""
try:
from .data_models import COBData
# Get latest COB data from cache
cob_data = self.get_latest_cob_data(symbol)
if cob_data and 'current_price' in cob_data:
return COBData(
symbol=symbol,
timestamp=datetime.now(),
current_price=cob_data['current_price'],
bucket_size=1.0 if 'ETH' in symbol else 10.0,
price_buckets=cob_data.get('price_buckets', {}),
bid_ask_imbalance=cob_data.get('bid_ask_imbalance', {}),
volume_weighted_prices=cob_data.get('volume_weighted_prices', {}),
order_flow_metrics=cob_data.get('order_flow_metrics', {}),
ma_1s_imbalance=cob_data.get('ma_1s_imbalance', {}),
ma_5s_imbalance=cob_data.get('ma_5s_imbalance', {}),
ma_15s_imbalance=cob_data.get('ma_15s_imbalance', {}),
ma_60s_imbalance=cob_data.get('ma_60s_imbalance', {})
)
return None
except Exception as e:
logger.error(f"Error getting COB data object for {symbol}: {e}")
return None
def invalidate_ohlcv_cache(self, symbol: str):
"""Invalidate OHLCV cache for a symbol when new data arrives"""
try:
with self._ohlcv_cache_lock:
# Remove cached data for all timeframes of this symbol
keys_to_remove = [key for key in self._ohlcv_cache.keys() if key.startswith(f"{symbol}_")]
for key in keys_to_remove:
if key in self._ohlcv_cache:
del self._ohlcv_cache[key]
if key in self._last_cache_update:
del self._last_cache_update[key]
except Exception as e:
logger.error(f"Error invalidating OHLCV cache for {symbol}: {e}")
def _add_basic_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""Add basic indicators for small datasets"""
try:

View File

@ -179,12 +179,7 @@ class TradingOrchestrator:
self.fusion_decisions_count: int = 0
self.fusion_training_data: List[Any] = [] # Store training examples for decision model
# Simplified Data Integration - Replace complex FIFO queues with efficient cache
from core.simplified_data_integration import SimplifiedDataIntegration
self.data_integration = SimplifiedDataIntegration(
data_provider=self.data_provider,
symbols=[self.symbol] + self.ref_symbols
)
# Use data provider directly for BaseDataInput building (optimized)
# COB Integration - Real-time market microstructure data
self.cob_integration = None # Will be set to COBIntegration instance if available
@ -250,8 +245,7 @@ class TradingOrchestrator:
self.data_provider.start_centralized_data_collection()
logger.info("Centralized data collection started - all models and dashboard will receive data")
# Initialize simplified data integration
self._initialize_simplified_data_integration()
# Data provider is already initialized and optimized
# Log initial data status
logger.info("Simplified data integration initialized")
@ -897,31 +891,10 @@ class TradingOrchestrator:
try:
self.latest_cob_data[symbol] = cob_data
# Update data cache with COB data for BaseDataInput
if hasattr(self, 'data_integration') and self.data_integration:
# Convert cob_data to COBData format if needed
from .data_models import COBData
# Create COBData object from the raw cob_data
if 'price_buckets' in cob_data and 'current_price' in cob_data:
cob_data_obj = COBData(
symbol=symbol,
timestamp=datetime.now(),
current_price=cob_data['current_price'],
bucket_size=1.0 if 'ETH' in symbol else 10.0,
price_buckets=cob_data.get('price_buckets', {}),
bid_ask_imbalance=cob_data.get('bid_ask_imbalance', {}),
volume_weighted_prices=cob_data.get('volume_weighted_prices', {}),
order_flow_metrics=cob_data.get('order_flow_metrics', {}),
ma_1s_imbalance=cob_data.get('ma_1s_imbalance', {}),
ma_5s_imbalance=cob_data.get('ma_5s_imbalance', {}),
ma_15s_imbalance=cob_data.get('ma_15s_imbalance', {}),
ma_60s_imbalance=cob_data.get('ma_60s_imbalance', {})
)
# Update cache with COB data
self.data_integration.cache.update('cob_data', symbol, cob_data_obj, 'cob_integration')
logger.debug(f"Updated cache with COB data for {symbol}")
# Invalidate data provider cache when new COB data arrives
if hasattr(self.data_provider, 'invalidate_ohlcv_cache'):
self.data_provider.invalidate_ohlcv_cache(symbol)
logger.debug(f"Invalidated data provider cache for {symbol} due to COB update")
# Update dashboard
if self.dashboard and hasattr(self.dashboard, 'update_cob_data'):
@ -3722,54 +3695,34 @@ class TradingOrchestrator:
"""
return self.db_manager.get_best_checkpoint_metadata(model_name)
# === SIMPLIFIED DATA MANAGEMENT ===
def _initialize_simplified_data_integration(self):
"""Initialize the simplified data integration system"""
try:
# Start the data integration system
self.data_integration.start()
logger.info("Simplified data integration started successfully")
except Exception as e:
logger.error(f"Error starting simplified data integration: {e}")
# === DATA MANAGEMENT ===
def _log_data_status(self):
"""Log current data status"""
try:
status = self.data_integration.get_cache_status()
cache_status = status.get('cache_status', {})
logger.info("=== Data Cache Status ===")
for data_type, symbols_data in cache_status.items():
symbol_info = []
for symbol, info in symbols_data.items():
age = info.get('age_seconds', 0)
has_data = info.get('has_data', False)
if has_data and age < 300: # Recent data
symbol_info.append(f"{symbol}:✅")
else:
symbol_info.append(f"{symbol}:❌")
if symbol_info:
logger.info(f"{data_type}: {', '.join(symbol_info)}")
logger.info("=== Data Provider Status ===")
logger.info("Data provider is running and optimized for BaseDataInput building")
except Exception as e:
logger.error(f"Error logging data status: {e}")
def update_data_cache(self, data_type: str, symbol: str, data: Any, source: str = "orchestrator") -> bool:
"""
Update data cache with new data (simplified approach)
Update data cache through data provider
Args:
data_type: Type of data ('ohlcv_1s', 'technical_indicators', etc.)
symbol: Trading symbol
data: New data to store
source: Source of the data
data: Data to store
source: Source of the update
Returns:
bool: True if successful
bool: True if updated successfully
"""
try:
return self.data_integration.cache.update(data_type, symbol, data, source)
# Invalidate cache when new data arrives
if hasattr(self.data_provider, 'invalidate_ohlcv_cache'):
self.data_provider.invalidate_ohlcv_cache(symbol)
return True
except Exception as e:
logger.error(f"Error updating data cache {data_type}/{symbol}: {e}")
return False
@ -3929,7 +3882,7 @@ class TradingOrchestrator:
def build_base_data_input(self, symbol: str) -> Optional[Any]:
"""
Build BaseDataInput using simplified data integration (optimized for speed)
Build BaseDataInput using optimized data provider (should be instantaneous)
Args:
symbol: Trading symbol
@ -3938,8 +3891,8 @@ class TradingOrchestrator:
BaseDataInput with consistent data structure
"""
try:
# Use simplified data integration to build BaseDataInput (should be instantaneous)
return self.data_integration.build_base_data_input(symbol)
# Use data provider's optimized build_base_data_input method
return self.data_provider.build_base_data_input(symbol)
except Exception as e:
logger.error(f"Error building BaseDataInput for {symbol}: {e}")

View File

@ -1,284 +0,0 @@
"""
Simplified Data Integration for Orchestrator
Replaces complex FIFO queues with simple cache-based data access.
Integrates with SmartDataUpdater for efficient data management.
"""
import logging
import threading
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import pandas as pd
from .data_cache import get_data_cache
from .smart_data_updater import SmartDataUpdater
from .data_models import BaseDataInput, OHLCVBar
logger = logging.getLogger(__name__)
class SimplifiedDataIntegration:
"""
Simplified data integration that replaces FIFO queues with efficient caching
"""
def __init__(self, data_provider, symbols: List[str]):
self.data_provider = data_provider
self.symbols = symbols
self.cache = get_data_cache()
# Initialize smart data updater
self.data_updater = SmartDataUpdater(data_provider, symbols)
# Pre-built OHLCV data cache for instant access
self._ohlcv_cache = {} # {symbol: {timeframe: List[OHLCVBar]}}
self._ohlcv_cache_lock = threading.RLock()
self._last_cache_update = {} # {symbol: {timeframe: datetime}}
# Register for tick data if available
self._setup_tick_integration()
logger.info(f"SimplifiedDataIntegration initialized for {symbols}")
def start(self):
"""Start the data integration system"""
self.data_updater.start()
logger.info("SimplifiedDataIntegration started")
def stop(self):
"""Stop the data integration system"""
self.data_updater.stop()
logger.info("SimplifiedDataIntegration stopped")
def _setup_tick_integration(self):
"""Setup integration with tick data sources"""
try:
# Register callbacks for tick data if available
if hasattr(self.data_provider, 'register_tick_callback'):
self.data_provider.register_tick_callback(self._on_tick_data)
# Register for WebSocket data if available
if hasattr(self.data_provider, 'register_websocket_callback'):
self.data_provider.register_websocket_callback(self._on_websocket_data)
except Exception as e:
logger.warning(f"Tick integration setup failed: {e}")
def _on_tick_data(self, symbol: str, price: float, volume: float, timestamp: datetime = None):
"""Handle incoming tick data"""
self.data_updater.add_tick(symbol, price, volume, timestamp)
# Invalidate OHLCV cache for this symbol
self._invalidate_ohlcv_cache(symbol)
def _on_websocket_data(self, symbol: str, data: Dict[str, Any]):
"""Handle WebSocket data updates"""
try:
# Extract price and volume from WebSocket data
if 'price' in data and 'volume' in data:
self.data_updater.add_tick(symbol, data['price'], data['volume'])
# Invalidate OHLCV cache for this symbol
self._invalidate_ohlcv_cache(symbol)
except Exception as e:
logger.error(f"Error processing WebSocket data: {e}")
def _invalidate_ohlcv_cache(self, symbol: str):
"""Invalidate OHLCV cache for a symbol when new data arrives"""
try:
with self._ohlcv_cache_lock:
# Remove cached data for all timeframes of this symbol
keys_to_remove = [key for key in self._ohlcv_cache.keys() if key.startswith(f"{symbol}_")]
for key in keys_to_remove:
if key in self._ohlcv_cache:
del self._ohlcv_cache[key]
if key in self._last_cache_update:
del self._last_cache_update[key]
except Exception as e:
logger.error(f"Error invalidating OHLCV cache for {symbol}: {e}")
def build_base_data_input(self, symbol: str) -> Optional[BaseDataInput]:
"""
Build BaseDataInput from cached data (optimized for speed)
Args:
symbol: Trading symbol
Returns:
BaseDataInput with consistent data structure
"""
try:
# Get OHLCV data directly from optimized cache (no validation checks for speed)
ohlcv_1s_list = self._get_ohlcv_data_list(symbol, '1s', 300)
ohlcv_1m_list = self._get_ohlcv_data_list(symbol, '1m', 300)
ohlcv_1h_list = self._get_ohlcv_data_list(symbol, '1h', 300)
ohlcv_1d_list = self._get_ohlcv_data_list(symbol, '1d', 300)
# Get BTC reference data
btc_symbol = 'BTC/USDT'
btc_ohlcv_1s_list = self._get_ohlcv_data_list(btc_symbol, '1s', 300)
if not btc_ohlcv_1s_list:
# Use ETH data as fallback
btc_ohlcv_1s_list = ohlcv_1s_list
# Get cached data (fast lookups)
technical_indicators = self.cache.get('technical_indicators', symbol) or {}
cob_data = self.cache.get('cob_data', symbol)
last_predictions = self._get_recent_predictions(symbol)
# Build BaseDataInput (no validation for speed - assume data is good)
base_data = BaseDataInput(
symbol=symbol,
timestamp=datetime.now(),
ohlcv_1s=ohlcv_1s_list,
ohlcv_1m=ohlcv_1m_list,
ohlcv_1h=ohlcv_1h_list,
ohlcv_1d=ohlcv_1d_list,
btc_ohlcv_1s=btc_ohlcv_1s_list,
technical_indicators=technical_indicators,
cob_data=cob_data,
last_predictions=last_predictions
)
return base_data
except Exception as e:
logger.error(f"Error building BaseDataInput for {symbol}: {e}")
return None
def _get_ohlcv_data_list(self, symbol: str, timeframe: str, max_count: int) -> List[OHLCVBar]:
"""Get OHLCV data list from pre-built cache for instant access"""
try:
with self._ohlcv_cache_lock:
cache_key = f"{symbol}_{timeframe}"
# Check if we have fresh cached data (updated within last 5 seconds)
last_update = self._last_cache_update.get(cache_key)
if (last_update and
(datetime.now() - last_update).total_seconds() < 5 and
cache_key in self._ohlcv_cache):
cached_data = self._ohlcv_cache[cache_key]
return cached_data[-max_count:] if len(cached_data) >= max_count else cached_data
# Need to rebuild cache for this symbol/timeframe
data_list = self._build_ohlcv_cache(symbol, timeframe, max_count)
# Cache the result
self._ohlcv_cache[cache_key] = data_list
self._last_cache_update[cache_key] = datetime.now()
return data_list[-max_count:] if len(data_list) >= max_count else data_list
except Exception as e:
logger.error(f"Error getting OHLCV data list for {symbol}/{timeframe}: {e}")
return self._create_dummy_data_list(symbol, timeframe, max_count)
def _build_ohlcv_cache(self, symbol: str, timeframe: str, max_count: int) -> List[OHLCVBar]:
"""Build OHLCV cache from historical and current data"""
try:
data_list = []
# Get historical data first (this should be fast as it's already cached)
historical_df = self.cache.get_historical_data(symbol, timeframe)
if historical_df is not None and not historical_df.empty:
# Convert historical data to OHLCVBar objects
for idx, row in historical_df.tail(max_count - 1).iterrows():
bar = OHLCVBar(
symbol=symbol,
timestamp=idx if hasattr(idx, 'to_pydatetime') else datetime.now(),
open=float(row['open']),
high=float(row['high']),
low=float(row['low']),
close=float(row['close']),
volume=float(row['volume']),
timeframe=timeframe
)
data_list.append(bar)
# Add current data from cache
current_ohlcv = self.cache.get(f'ohlcv_{timeframe}', symbol)
if current_ohlcv and isinstance(current_ohlcv, OHLCVBar):
data_list.append(current_ohlcv)
# Ensure we have the right amount of data (pad if necessary)
while len(data_list) < max_count:
data_list.extend(self._create_dummy_data_list(symbol, timeframe, max_count - len(data_list)))
return data_list
except Exception as e:
logger.error(f"Error building OHLCV cache for {symbol}/{timeframe}: {e}")
return self._create_dummy_data_list(symbol, timeframe, max_count)
def _try_historical_fallback(self, symbol: str, missing_timeframes: List[str]) -> bool:
"""Try to use historical data for missing timeframes"""
try:
for timeframe in missing_timeframes:
historical_df = self.cache.get_historical_data(symbol, timeframe)
if historical_df is not None and not historical_df.empty:
# Use latest historical data as current data
latest_row = historical_df.iloc[-1]
ohlcv_bar = OHLCVBar(
symbol=symbol,
timestamp=historical_df.index[-1] if hasattr(historical_df.index[-1], 'to_pydatetime') else datetime.now(),
open=float(latest_row['open']),
high=float(latest_row['high']),
low=float(latest_row['low']),
close=float(latest_row['close']),
volume=float(latest_row['volume']),
timeframe=timeframe
)
self.cache.update(f'ohlcv_{timeframe}', symbol, ohlcv_bar, 'historical_fallback')
logger.info(f"Used historical fallback for {symbol} {timeframe}")
return True
except Exception as e:
logger.error(f"Error in historical fallback: {e}")
return False
def _get_recent_predictions(self, symbol: str) -> Dict[str, Any]:
"""Get recent model predictions"""
try:
predictions = {}
# Get predictions from cache
for model_type in ['cnn', 'rl', 'extrema']:
prediction_data = self.cache.get(f'prediction_{model_type}', symbol)
if prediction_data:
predictions[model_type] = prediction_data
return predictions
except Exception as e:
logger.error(f"Error getting recent predictions for {symbol}: {e}")
return {}
def update_model_prediction(self, model_name: str, symbol: str, prediction_data: Any):
"""Update model prediction in cache"""
self.cache.update(f'prediction_{model_name}', symbol, prediction_data, model_name)
def get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price for a symbol"""
return self.data_updater.get_current_price(symbol)
def get_cache_status(self) -> Dict[str, Any]:
"""Get cache status for monitoring"""
return {
'cache_status': self.cache.get_status(),
'updater_status': self.data_updater.get_status()
}
def has_sufficient_data(self, symbol: str) -> bool:
"""Check if we have sufficient data for model predictions"""
required_data = ['ohlcv_1s', 'ohlcv_1m', 'ohlcv_1h', 'ohlcv_1d']
for data_type in required_data:
if not self.cache.has_data(data_type, symbol, max_age_seconds=300):
# Check historical data as fallback
timeframe = data_type.split('_')[1]
if not self.cache.has_historical_data(symbol, timeframe, min_bars=50):
return False
return True

View File

@ -1,358 +0,0 @@
"""
Smart Data Updater
Efficiently manages data updates using:
1. Initial historical data load (once)
2. Live tick data from WebSocket
3. Periodic HTTP updates (1m every minute, 1h every hour)
4. Smart candle construction from ticks
"""
import threading
import time
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
import pandas as pd
import numpy as np
from collections import deque
from .data_cache import get_data_cache, DataCache
from .data_models import OHLCVBar
logger = logging.getLogger(__name__)
class SmartDataUpdater:
"""
Smart data updater that efficiently manages market data with minimal API calls
"""
def __init__(self, data_provider, symbols: List[str]):
self.data_provider = data_provider
self.symbols = symbols
self.cache = get_data_cache()
self.running = False
# Tick data for candle construction
self.tick_buffers: Dict[str, deque] = {symbol: deque(maxlen=1000) for symbol in symbols}
self.tick_locks: Dict[str, threading.Lock] = {symbol: threading.Lock() for symbol in symbols}
# Current candle construction
self.current_candles: Dict[str, Dict[str, Dict]] = {} # {symbol: {timeframe: candle_data}}
self.candle_locks: Dict[str, threading.Lock] = {symbol: threading.Lock() for symbol in symbols}
# Update timers
self.last_updates: Dict[str, Dict[str, datetime]] = {} # {symbol: {timeframe: last_update}}
# Update intervals (in seconds)
self.update_intervals = {
'1s': 10, # Update 1s candles every 10 seconds from ticks
'1m': 60, # Update 1m candles every minute via HTTP
'1h': 3600, # Update 1h candles every hour via HTTP
'1d': 86400 # Update 1d candles every day via HTTP
}
logger.info(f"SmartDataUpdater initialized for {len(symbols)} symbols")
def start(self):
"""Start the smart data updater"""
if self.running:
return
self.running = True
# Load initial historical data
self._load_initial_historical_data()
# Start update threads
self.update_thread = threading.Thread(target=self._update_worker, daemon=True)
self.update_thread.start()
# Start tick processing thread
self.tick_thread = threading.Thread(target=self._tick_processor, daemon=True)
self.tick_thread.start()
logger.info("SmartDataUpdater started")
def stop(self):
"""Stop the smart data updater"""
self.running = False
logger.info("SmartDataUpdater stopped")
def add_tick(self, symbol: str, price: float, volume: float, timestamp: datetime = None):
"""Add tick data for candle construction"""
if symbol not in self.tick_buffers:
return
tick_data = {
'price': price,
'volume': volume,
'timestamp': timestamp or datetime.now()
}
with self.tick_locks[symbol]:
self.tick_buffers[symbol].append(tick_data)
def _load_initial_historical_data(self):
"""Load initial historical data for all symbols and timeframes"""
logger.info("Loading initial historical data...")
timeframes = ['1s', '1m', '1h', '1d']
limits = {'1s': 300, '1m': 300, '1h': 300, '1d': 300}
for symbol in self.symbols:
self.last_updates[symbol] = {}
self.current_candles[symbol] = {}
for timeframe in timeframes:
try:
limit = limits.get(timeframe, 300)
# Get historical data
df = None
if hasattr(self.data_provider, 'get_historical_data'):
df = self.data_provider.get_historical_data(symbol, timeframe, limit=limit)
if df is not None and not df.empty:
# Store in cache
self.cache.store_historical_data(symbol, timeframe, df)
# Update current candle data from latest bar
latest_bar = df.iloc[-1]
self._update_current_candle_from_bar(symbol, timeframe, latest_bar)
# Update cache with latest OHLCV
ohlcv_bar = self._df_row_to_ohlcv_bar(symbol, timeframe, latest_bar, df.index[-1])
self.cache.update(f'ohlcv_{timeframe}', symbol, ohlcv_bar, 'historical')
self.last_updates[symbol][timeframe] = datetime.now()
logger.info(f"Loaded {len(df)} {timeframe} bars for {symbol}")
else:
logger.warning(f"No historical data for {symbol} {timeframe}")
except Exception as e:
logger.error(f"Error loading historical data for {symbol} {timeframe}: {e}")
# Calculate initial technical indicators
self._calculate_technical_indicators()
logger.info("Initial historical data loading completed")
def _update_worker(self):
"""Background worker for periodic data updates"""
while self.running:
try:
current_time = datetime.now()
for symbol in self.symbols:
for timeframe in ['1m', '1h', '1d']: # Skip 1s (built from ticks)
try:
# Check if it's time to update
last_update = self.last_updates[symbol].get(timeframe)
interval = self.update_intervals[timeframe]
if not last_update or (current_time - last_update).total_seconds() >= interval:
self._update_timeframe_data(symbol, timeframe)
self.last_updates[symbol][timeframe] = current_time
except Exception as e:
logger.error(f"Error updating {symbol} {timeframe}: {e}")
# Update technical indicators every minute
if current_time.second < 10: # Update in first 10 seconds of each minute
self._calculate_technical_indicators()
time.sleep(10) # Check every 10 seconds
except Exception as e:
logger.error(f"Error in update worker: {e}")
time.sleep(30)
def _tick_processor(self):
"""Process ticks to build 1s candles"""
while self.running:
try:
current_time = datetime.now()
for symbol in self.symbols:
# Check if it's time to update 1s candles
last_update = self.last_updates[symbol].get('1s')
if not last_update or (current_time - last_update).total_seconds() >= self.update_intervals['1s']:
self._build_1s_candle_from_ticks(symbol)
self.last_updates[symbol]['1s'] = current_time
time.sleep(5) # Process every 5 seconds
except Exception as e:
logger.error(f"Error in tick processor: {e}")
time.sleep(10)
def _update_timeframe_data(self, symbol: str, timeframe: str):
"""Update data for a specific timeframe via HTTP"""
try:
# Get latest data from API
df = None
if hasattr(self.data_provider, 'get_latest_candles'):
df = self.data_provider.get_latest_candles(symbol, timeframe, limit=1)
elif hasattr(self.data_provider, 'get_historical_data'):
df = self.data_provider.get_historical_data(symbol, timeframe, limit=1)
if df is not None and not df.empty:
latest_bar = df.iloc[-1]
# Update current candle
self._update_current_candle_from_bar(symbol, timeframe, latest_bar)
# Update cache
ohlcv_bar = self._df_row_to_ohlcv_bar(symbol, timeframe, latest_bar, df.index[-1])
self.cache.update(f'ohlcv_{timeframe}', symbol, ohlcv_bar, 'http_update')
logger.debug(f"Updated {symbol} {timeframe} via HTTP")
except Exception as e:
logger.error(f"Error updating {symbol} {timeframe} via HTTP: {e}")
def _build_1s_candle_from_ticks(self, symbol: str):
"""Build 1s candle from accumulated ticks"""
try:
with self.tick_locks[symbol]:
ticks = list(self.tick_buffers[symbol])
if not ticks:
return
# Get ticks from last 10 seconds
cutoff_time = datetime.now() - timedelta(seconds=10)
recent_ticks = [tick for tick in ticks if tick['timestamp'] >= cutoff_time]
if not recent_ticks:
return
# Build OHLCV from ticks
prices = [tick['price'] for tick in recent_ticks]
volumes = [tick['volume'] for tick in recent_ticks]
ohlcv_data = {
'open': prices[0],
'high': max(prices),
'low': min(prices),
'close': prices[-1],
'volume': sum(volumes)
}
# Update current candle
with self.candle_locks[symbol]:
self.current_candles[symbol]['1s'] = ohlcv_data
# Create OHLCV bar and update cache
ohlcv_bar = OHLCVBar(
symbol=symbol,
timestamp=recent_ticks[-1]['timestamp'],
open=ohlcv_data['open'],
high=ohlcv_data['high'],
low=ohlcv_data['low'],
close=ohlcv_data['close'],
volume=ohlcv_data['volume'],
timeframe='1s'
)
self.cache.update('ohlcv_1s', symbol, ohlcv_bar, 'tick_constructed')
logger.debug(f"Built 1s candle for {symbol} from {len(recent_ticks)} ticks")
except Exception as e:
logger.error(f"Error building 1s candle from ticks for {symbol}: {e}")
def _update_current_candle_from_bar(self, symbol: str, timeframe: str, bar_data):
"""Update current candle data from a bar"""
try:
with self.candle_locks[symbol]:
self.current_candles[symbol][timeframe] = {
'open': float(bar_data['open']),
'high': float(bar_data['high']),
'low': float(bar_data['low']),
'close': float(bar_data['close']),
'volume': float(bar_data['volume'])
}
except Exception as e:
logger.error(f"Error updating current candle for {symbol} {timeframe}: {e}")
def _df_row_to_ohlcv_bar(self, symbol: str, timeframe: str, row, timestamp) -> OHLCVBar:
"""Convert DataFrame row to OHLCVBar"""
return OHLCVBar(
symbol=symbol,
timestamp=timestamp if hasattr(timestamp, 'to_pydatetime') else datetime.now(),
open=float(row['open']),
high=float(row['high']),
low=float(row['low']),
close=float(row['close']),
volume=float(row['volume']),
timeframe=timeframe
)
def _calculate_technical_indicators(self):
"""Calculate technical indicators for all symbols"""
try:
for symbol in self.symbols:
# Use 1m historical data for indicators
df = self.cache.get_historical_data(symbol, '1m')
if df is None or len(df) < 20:
continue
indicators = {}
try:
import ta
# RSI
if len(df) >= 14:
indicators['rsi'] = ta.momentum.RSIIndicator(df['close']).rsi().iloc[-1]
# Moving averages
if len(df) >= 20:
indicators['sma_20'] = df['close'].rolling(20).mean().iloc[-1]
if len(df) >= 12:
indicators['ema_12'] = df['close'].ewm(span=12).mean().iloc[-1]
if len(df) >= 26:
indicators['ema_26'] = df['close'].ewm(span=26).mean().iloc[-1]
if 'ema_12' in indicators:
indicators['macd'] = indicators['ema_12'] - indicators['ema_26']
# Bollinger Bands
if len(df) >= 20:
bb_period = 20
bb_std = 2
sma = df['close'].rolling(bb_period).mean()
std = df['close'].rolling(bb_period).std()
indicators['bb_upper'] = (sma + (std * bb_std)).iloc[-1]
indicators['bb_lower'] = (sma - (std * bb_std)).iloc[-1]
indicators['bb_middle'] = sma.iloc[-1]
# Remove NaN values
indicators = {k: float(v) for k, v in indicators.items() if not pd.isna(v)}
if indicators:
self.cache.update('technical_indicators', symbol, indicators, 'calculated')
logger.debug(f"Calculated {len(indicators)} indicators for {symbol}")
except Exception as e:
logger.error(f"Error calculating indicators for {symbol}: {e}")
except Exception as e:
logger.error(f"Error in technical indicators calculation: {e}")
def get_current_price(self, symbol: str) -> Optional[float]:
"""Get current price from latest 1s candle"""
ohlcv_1s = self.cache.get('ohlcv_1s', symbol)
if ohlcv_1s:
return ohlcv_1s.close
return None
def get_status(self) -> Dict[str, Any]:
"""Get updater status"""
status = {
'running': self.running,
'symbols': self.symbols,
'last_updates': self.last_updates,
'tick_buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()},
'cache_status': self.cache.get_status()
}
return status