From 3a748daff2aa147243d8d1a992eab9b40222764f Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 31 May 2025 00:33:07 +0300 Subject: [PATCH] better pivots --- core/data_provider.py | 256 ++++++++++++++++++----------- test_pivot_normalization_system.py | 6 +- web/dashboard.py | 21 ++- 3 files changed, 175 insertions(+), 108 deletions(-) diff --git a/core/data_provider.py b/core/data_provider.py index a7238be..195a942 100644 --- a/core/data_provider.py +++ b/core/data_provider.py @@ -646,95 +646,68 @@ class DataProvider: # === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM === - def _collect_monthly_1s_data(self, symbol: str) -> Optional[pd.DataFrame]: - """Collect 1 month of 1s candles using paginated API calls""" + def _collect_monthly_1m_data(self, symbol: str) -> Optional[pd.DataFrame]: + """Collect 30 days of 1m candles with smart gap-filling cache system""" try: - # Check if we have cached monthly data first - cached_monthly_data = self._load_monthly_data_from_cache(symbol) - if cached_monthly_data is not None: - logger.info(f"Using cached monthly 1s data for {symbol}: {len(cached_monthly_data)} candles") - return cached_monthly_data + # Check for cached data and determine what we need to fetch + cached_data = self._load_monthly_data_from_cache(symbol) - logger.info(f"Collecting 1 month of 1s data for {symbol}...") - - # Calculate time range (30 days) end_time = datetime.now() start_time = end_time - timedelta(days=30) - all_candles = [] - current_time = end_time - api_calls_made = 0 - total_candles_collected = 0 + if cached_data is not None and not cached_data.empty: + logger.info(f"Found cached monthly 1m data for {symbol}: {len(cached_data)} candles") + + # Check cache data range + cache_start = cached_data['timestamp'].min() + cache_end = cached_data['timestamp'].max() + + logger.info(f"Cache range: {cache_start} to {cache_end}") + + # Remove data older than 30 days + cached_data = cached_data[cached_data['timestamp'] >= start_time] + + # Check if we need to fill gaps + gap_start = cache_end + timedelta(minutes=1) + + if gap_start < end_time: + # Need to fill gap from cache_end to now + logger.info(f"Filling gap from {gap_start} to {end_time}") + gap_data = self._fetch_1m_data_range(symbol, gap_start, end_time) + + if gap_data is not None and not gap_data.empty: + # Combine cached data with gap data + monthly_df = pd.concat([cached_data, gap_data], ignore_index=True) + monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True) + logger.info(f"Combined cache + gap: {len(monthly_df)} total candles") + else: + monthly_df = cached_data + logger.info(f"Using cached data only: {len(monthly_df)} candles") + else: + monthly_df = cached_data + logger.info(f"Cache is up to date: {len(monthly_df)} candles") + else: + # No cache - fetch full 30 days + logger.info(f"No cache found, collecting full 30 days of 1m data for {symbol}") + monthly_df = self._fetch_1m_data_range(symbol, start_time, end_time) - # Binance rate limit: 1200 requests/minute = 20/second - rate_limit_delay = 0.05 # 50ms between requests - - while current_time > start_time and api_calls_made < 3000: # Safety limit - try: - # Get 1000 candles working backwards - batch_df = self._fetch_1s_batch_with_endtime(symbol, current_time, limit=1000) - - if batch_df is None or batch_df.empty: - logger.warning(f"No data returned for batch ending at {current_time}") - break - - api_calls_made += 1 - batch_size = len(batch_df) - total_candles_collected += batch_size - - # Add batch to collection - all_candles.append(batch_df) - - # Update current time to the earliest timestamp in this batch - earliest_time = batch_df['timestamp'].min() - if earliest_time >= current_time: - logger.warning(f"No progress in time collection, breaking") - break - - current_time = earliest_time - timedelta(seconds=1) - - # Rate limiting - time.sleep(rate_limit_delay) - - # Progress logging every 100 requests - if api_calls_made % 100 == 0: - logger.info(f"Progress: {api_calls_made} API calls, {total_candles_collected} candles collected") - - # Break if we have enough data (about 2.6M candles for 30 days) - if total_candles_collected >= 2500000: # 30 days * 24 hours * 3600 seconds ≈ 2.6M - logger.info(f"Collected sufficient data: {total_candles_collected} candles") - break - - except Exception as e: - logger.error(f"Error in batch collection: {e}") - time.sleep(1) # Wait longer on error - continue - - if not all_candles: - logger.error(f"No monthly data collected for {symbol}") + if monthly_df is not None and not monthly_df.empty: + # Final cleanup: ensure exactly 30 days + monthly_df = monthly_df[monthly_df['timestamp'] >= start_time] + monthly_df = monthly_df.sort_values('timestamp').reset_index(drop=True) + + logger.info(f"Final dataset: {len(monthly_df)} 1m candles for {symbol}") + + # Update cache + self._save_monthly_data_to_cache(symbol, monthly_df) + + return monthly_df + else: + logger.error(f"No monthly 1m data collected for {symbol}") return None - # Combine all batches - logger.info(f"Combining {len(all_candles)} batches...") - monthly_df = pd.concat(all_candles, ignore_index=True) - - # Sort by timestamp and remove duplicates - monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True) - - # Filter to exactly 30 days - cutoff_time = end_time - timedelta(days=30) - monthly_df = monthly_df[monthly_df['timestamp'] >= cutoff_time] - - logger.info(f"Successfully collected {len(monthly_df)} 1s candles for {symbol} " - f"({api_calls_made} API calls made)") - - # Cache the monthly data - self._save_monthly_data_to_cache(symbol, monthly_df) - - return monthly_df - except Exception as e: - logger.error(f"Error collecting monthly 1s data for {symbol}: {e}") + logger.error(f"Error collecting monthly 1m data for {symbol}: {e}") return None def _fetch_1s_batch_with_endtime(self, symbol: str, end_time: datetime, limit: int = 1000) -> Optional[pd.DataFrame]: @@ -788,6 +761,105 @@ class DataProvider: logger.error(f"Error fetching 1s batch for {symbol}: {e}") return None + def _fetch_1m_data_range(self, symbol: str, start_time: datetime, end_time: datetime) -> Optional[pd.DataFrame]: + """Fetch 1m candles for a specific time range with efficient batching""" + try: + # Convert symbol format for Binance API + if '/' in symbol: + api_symbol = symbol.replace('/', '') + else: + api_symbol = symbol + + logger.info(f"Fetching 1m data for {symbol} from {start_time} to {end_time}") + + all_candles = [] + current_start = start_time + batch_size = 1000 # Binance limit + api_calls_made = 0 + + while current_start < end_time and api_calls_made < 50: # Safety limit for 30 days + try: + # Calculate end time for this batch + batch_end = min(current_start + timedelta(minutes=batch_size), end_time) + + # Convert to milliseconds + start_timestamp = int(current_start.timestamp() * 1000) + end_timestamp = int(batch_end.timestamp() * 1000) + + # Binance API call + url = "https://api.binance.com/api/v3/klines" + params = { + 'symbol': api_symbol, + 'interval': '1m', + 'startTime': start_timestamp, + 'endTime': end_timestamp, + 'limit': batch_size + } + + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'Accept': 'application/json' + } + + response = requests.get(url, params=params, headers=headers, timeout=10) + response.raise_for_status() + + data = response.json() + api_calls_made += 1 + + if not data: + logger.warning(f"No data returned for batch {current_start} to {batch_end}") + break + + # Convert to DataFrame + batch_df = pd.DataFrame(data, columns=[ + 'timestamp', 'open', 'high', 'low', 'close', 'volume', + 'close_time', 'quote_volume', 'trades', 'taker_buy_base', + 'taker_buy_quote', 'ignore' + ]) + + # Process columns + batch_df['timestamp'] = pd.to_datetime(batch_df['timestamp'], unit='ms') + for col in ['open', 'high', 'low', 'close', 'volume']: + batch_df[col] = batch_df[col].astype(float) + + # Keep only OHLCV columns + batch_df = batch_df[['timestamp', 'open', 'high', 'low', 'close', 'volume']] + + all_candles.append(batch_df) + + # Move to next batch (add 1 minute to avoid overlap) + current_start = batch_end + timedelta(minutes=1) + + # Rate limiting (Binance allows 1200/min) + time.sleep(0.05) # 50ms delay + + # Progress logging + if api_calls_made % 10 == 0: + total_candles = sum(len(df) for df in all_candles) + logger.info(f"Progress: {api_calls_made} API calls, {total_candles} candles collected") + + except Exception as e: + logger.error(f"Error in batch {current_start} to {batch_end}: {e}") + current_start += timedelta(minutes=batch_size) + time.sleep(1) # Wait longer on error + continue + + if not all_candles: + logger.error(f"No data collected for {symbol}") + return None + + # Combine all batches + df = pd.concat(all_candles, ignore_index=True) + df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True) + + logger.info(f"Successfully fetched {len(df)} 1m candles for {symbol} ({api_calls_made} API calls)") + return df + + except Exception as e: + logger.error(f"Error fetching 1m data range for {symbol}: {e}") + return None + def _extract_pivot_bounds_from_monthly_data(self, symbol: str, monthly_data: pd.DataFrame) -> Optional[PivotBounds]: """Extract pivot bounds using Williams Market Structure analysis""" try: @@ -958,8 +1030,8 @@ class DataProvider: def _refresh_pivot_bounds_for_symbol(self, symbol: str): """Refresh pivot bounds for a specific symbol""" try: - # Collect monthly 1s data - monthly_data = self._collect_monthly_1s_data(symbol) + # Collect monthly 1m data + monthly_data = self._collect_monthly_1m_data(symbol) if monthly_data is None or monthly_data.empty: logger.warning(f"Could not collect monthly data for {symbol}") return @@ -1074,17 +1146,13 @@ class DataProvider: logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}") def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]: - """Load monthly 1s data from cache""" + """Load monthly 1m data from cache""" try: - cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1s.parquet" + cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" if cache_file.exists(): - # Check if cache is recent (less than 1 day old) - cache_age = time.time() - cache_file.stat().st_mtime - if cache_age < 86400: # 24 hours - df = pd.read_parquet(cache_file) - return df - else: - logger.info(f"Monthly data cache for {symbol} is too old ({cache_age/3600:.1f}h)") + df = pd.read_parquet(cache_file) + logger.info(f"Loaded {len(df)} 1m candles from cache for {symbol}") + return df return None @@ -1093,11 +1161,11 @@ class DataProvider: return None def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame): - """Save monthly 1s data to cache""" + """Save monthly 1m data to cache""" try: - cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1s.parquet" + cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet" df.to_parquet(cache_file, index=False) - logger.info(f"Saved {len(df)} monthly 1s candles to cache for {symbol}") + logger.info(f"Saved {len(df)} monthly 1m candles to cache for {symbol}") except Exception as e: logger.warning(f"Error saving monthly data to cache for {symbol}: {e}") diff --git a/test_pivot_normalization_system.py b/test_pivot_normalization_system.py index ceed32e..e47f613 100644 --- a/test_pivot_normalization_system.py +++ b/test_pivot_normalization_system.py @@ -37,7 +37,7 @@ def test_pivot_normalization_system(): # Initialize data provider symbols = ['ETH/USDT'] # Test with ETH only - timeframes = ['1s', '1m', '1h'] + timeframes = ['1s'] logger.info("Initializing DataProvider with pivot-based normalization...") data_provider = DataProvider(symbols=symbols, timeframes=timeframes) @@ -52,11 +52,11 @@ def test_pivot_normalization_system(): try: # This will trigger monthly data collection and pivot analysis logger.info(f"Testing monthly data collection for {symbol}...") - monthly_data = data_provider._collect_monthly_1s_data(symbol) + monthly_data = data_provider._collect_monthly_1m_data(symbol) if monthly_data is not None: print(f"✅ Monthly data collection SUCCESS") - print(f" 📊 Collected {len(monthly_data):,} 1s candles") + print(f" 📊 Collected {len(monthly_data):,} 1m candles") print(f" 📅 Period: {monthly_data['timestamp'].min()} to {monthly_data['timestamp'].max()}") print(f" 💰 Price range: ${monthly_data['low'].min():.2f} - ${monthly_data['high'].max():.2f}") print(f" 📈 Volume range: {monthly_data['volume'].min():.2f} - {monthly_data['volume'].max():.2f}") diff --git a/web/dashboard.py b/web/dashboard.py index 3ccfa75..76e3b5c 100644 --- a/web/dashboard.py +++ b/web/dashboard.py @@ -3010,10 +3010,9 @@ class TradingDashboard: # Update internal model storage for model_type, model_data in loaded_models.items(): model_info = model_data['info'] - logger.info(f"Using best {model_type} model: {model_info.model_name} " - f"(Score: {model_info.metrics.get_composite_score():.3f})") + logger.info(f"Using best {model_type} model: {model_info.model_name} (Score: {model_info.metrics.get_composite_score():.3f})") - else: + else: logger.info("No managed models available, falling back to legacy loading") # Fallback to original model loading logic self._load_legacy_models() @@ -3021,7 +3020,7 @@ class TradingDashboard: except ImportError: logger.warning("ModelManager not available, using legacy model loading") self._load_legacy_models() - except Exception as e: + except Exception as e: logger.error(f"Error loading models via ModelManager: {e}") self._load_legacy_models() @@ -3053,7 +3052,7 @@ class TradingDashboard: with torch.no_grad(): if hasattr(feature_matrix, 'shape') and len(feature_matrix.shape) == 2: feature_tensor = torch.FloatTensor(feature_matrix).unsqueeze(0) - else: + else: feature_tensor = torch.FloatTensor(feature_matrix) prediction = self.model(feature_tensor) @@ -3090,7 +3089,7 @@ class TradingDashboard: }) logger.info(f"Loaded CNN model: {model_file}") - except Exception as e: + except Exception as e: logger.warning(f"Failed to load CNN model {model_file}: {e}") # Check for RL models @@ -3101,12 +3100,12 @@ class TradingDashboard: try: checkpoint_path = os.path.join(rl_models_dir, model_file) - class RLWrapper: - def __init__(self, checkpoint_path): + class RLWrapper: + def __init__(self, checkpoint_path): self.checkpoint_path = checkpoint_path self.checkpoint = torch.load(checkpoint_path, map_location='cpu') - def predict(self, feature_matrix): + def predict(self, feature_matrix): # Mock RL prediction if hasattr(feature_matrix, 'shape'): state_sum = np.sum(feature_matrix) % 100 @@ -3117,7 +3116,7 @@ class TradingDashboard: action_probs = [0.1, 0.1, 0.8] # BUY elif state_sum < 30: action_probs = [0.8, 0.1, 0.1] # SELL - else: + else: action_probs = [0.2, 0.6, 0.2] # HOLD return np.array(action_probs) @@ -3137,7 +3136,7 @@ class TradingDashboard: }) logger.info(f"Loaded RL model: {model_file}") - except Exception as e: + except Exception as e: logger.warning(f"Failed to load RL model {model_file}: {e}") total_models = sum(len(models) for models in self.available_models.values())