better pivots

This commit is contained in:
Dobromir Popov 2025-05-31 00:33:07 +03:00
parent 7a0e468c3e
commit 3a748daff2
3 changed files with 175 additions and 108 deletions

View File

@ -646,95 +646,68 @@ class DataProvider:
# === WILLIAMS MARKET STRUCTURE PIVOT SYSTEM ===
def _collect_monthly_1s_data(self, symbol: str) -> Optional[pd.DataFrame]:
"""Collect 1 month of 1s candles using paginated API calls"""
def _collect_monthly_1m_data(self, symbol: str) -> Optional[pd.DataFrame]:
"""Collect 30 days of 1m candles with smart gap-filling cache system"""
try:
# Check if we have cached monthly data first
cached_monthly_data = self._load_monthly_data_from_cache(symbol)
if cached_monthly_data is not None:
logger.info(f"Using cached monthly 1s data for {symbol}: {len(cached_monthly_data)} candles")
return cached_monthly_data
# Check for cached data and determine what we need to fetch
cached_data = self._load_monthly_data_from_cache(symbol)
logger.info(f"Collecting 1 month of 1s data for {symbol}...")
# Calculate time range (30 days)
end_time = datetime.now()
start_time = end_time - timedelta(days=30)
all_candles = []
current_time = end_time
api_calls_made = 0
total_candles_collected = 0
if cached_data is not None and not cached_data.empty:
logger.info(f"Found cached monthly 1m data for {symbol}: {len(cached_data)} candles")
# Binance rate limit: 1200 requests/minute = 20/second
rate_limit_delay = 0.05 # 50ms between requests
# Check cache data range
cache_start = cached_data['timestamp'].min()
cache_end = cached_data['timestamp'].max()
while current_time > start_time and api_calls_made < 3000: # Safety limit
try:
# Get 1000 candles working backwards
batch_df = self._fetch_1s_batch_with_endtime(symbol, current_time, limit=1000)
logger.info(f"Cache range: {cache_start} to {cache_end}")
if batch_df is None or batch_df.empty:
logger.warning(f"No data returned for batch ending at {current_time}")
break
# Remove data older than 30 days
cached_data = cached_data[cached_data['timestamp'] >= start_time]
api_calls_made += 1
batch_size = len(batch_df)
total_candles_collected += batch_size
# Check if we need to fill gaps
gap_start = cache_end + timedelta(minutes=1)
# Add batch to collection
all_candles.append(batch_df)
if gap_start < end_time:
# Need to fill gap from cache_end to now
logger.info(f"Filling gap from {gap_start} to {end_time}")
gap_data = self._fetch_1m_data_range(symbol, gap_start, end_time)
# Update current time to the earliest timestamp in this batch
earliest_time = batch_df['timestamp'].min()
if earliest_time >= current_time:
logger.warning(f"No progress in time collection, breaking")
break
current_time = earliest_time - timedelta(seconds=1)
# Rate limiting
time.sleep(rate_limit_delay)
# Progress logging every 100 requests
if api_calls_made % 100 == 0:
logger.info(f"Progress: {api_calls_made} API calls, {total_candles_collected} candles collected")
# Break if we have enough data (about 2.6M candles for 30 days)
if total_candles_collected >= 2500000: # 30 days * 24 hours * 3600 seconds ≈ 2.6M
logger.info(f"Collected sufficient data: {total_candles_collected} candles")
break
except Exception as e:
logger.error(f"Error in batch collection: {e}")
time.sleep(1) # Wait longer on error
continue
if not all_candles:
logger.error(f"No monthly data collected for {symbol}")
return None
# Combine all batches
logger.info(f"Combining {len(all_candles)} batches...")
monthly_df = pd.concat(all_candles, ignore_index=True)
# Sort by timestamp and remove duplicates
if gap_data is not None and not gap_data.empty:
# Combine cached data with gap data
monthly_df = pd.concat([cached_data, gap_data], ignore_index=True)
monthly_df = monthly_df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
logger.info(f"Combined cache + gap: {len(monthly_df)} total candles")
else:
monthly_df = cached_data
logger.info(f"Using cached data only: {len(monthly_df)} candles")
else:
monthly_df = cached_data
logger.info(f"Cache is up to date: {len(monthly_df)} candles")
else:
# No cache - fetch full 30 days
logger.info(f"No cache found, collecting full 30 days of 1m data for {symbol}")
monthly_df = self._fetch_1m_data_range(symbol, start_time, end_time)
# Filter to exactly 30 days
cutoff_time = end_time - timedelta(days=30)
monthly_df = monthly_df[monthly_df['timestamp'] >= cutoff_time]
if monthly_df is not None and not monthly_df.empty:
# Final cleanup: ensure exactly 30 days
monthly_df = monthly_df[monthly_df['timestamp'] >= start_time]
monthly_df = monthly_df.sort_values('timestamp').reset_index(drop=True)
logger.info(f"Successfully collected {len(monthly_df)} 1s candles for {symbol} "
f"({api_calls_made} API calls made)")
logger.info(f"Final dataset: {len(monthly_df)} 1m candles for {symbol}")
# Cache the monthly data
# Update cache
self._save_monthly_data_to_cache(symbol, monthly_df)
return monthly_df
else:
logger.error(f"No monthly 1m data collected for {symbol}")
return None
except Exception as e:
logger.error(f"Error collecting monthly 1s data for {symbol}: {e}")
logger.error(f"Error collecting monthly 1m data for {symbol}: {e}")
return None
def _fetch_1s_batch_with_endtime(self, symbol: str, end_time: datetime, limit: int = 1000) -> Optional[pd.DataFrame]:
@ -788,6 +761,105 @@ class DataProvider:
logger.error(f"Error fetching 1s batch for {symbol}: {e}")
return None
def _fetch_1m_data_range(self, symbol: str, start_time: datetime, end_time: datetime) -> Optional[pd.DataFrame]:
"""Fetch 1m candles for a specific time range with efficient batching"""
try:
# Convert symbol format for Binance API
if '/' in symbol:
api_symbol = symbol.replace('/', '')
else:
api_symbol = symbol
logger.info(f"Fetching 1m data for {symbol} from {start_time} to {end_time}")
all_candles = []
current_start = start_time
batch_size = 1000 # Binance limit
api_calls_made = 0
while current_start < end_time and api_calls_made < 50: # Safety limit for 30 days
try:
# Calculate end time for this batch
batch_end = min(current_start + timedelta(minutes=batch_size), end_time)
# Convert to milliseconds
start_timestamp = int(current_start.timestamp() * 1000)
end_timestamp = int(batch_end.timestamp() * 1000)
# Binance API call
url = "https://api.binance.com/api/v3/klines"
params = {
'symbol': api_symbol,
'interval': '1m',
'startTime': start_timestamp,
'endTime': end_timestamp,
'limit': batch_size
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json'
}
response = requests.get(url, params=params, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
api_calls_made += 1
if not data:
logger.warning(f"No data returned for batch {current_start} to {batch_end}")
break
# Convert to DataFrame
batch_df = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_volume', 'trades', 'taker_buy_base',
'taker_buy_quote', 'ignore'
])
# Process columns
batch_df['timestamp'] = pd.to_datetime(batch_df['timestamp'], unit='ms')
for col in ['open', 'high', 'low', 'close', 'volume']:
batch_df[col] = batch_df[col].astype(float)
# Keep only OHLCV columns
batch_df = batch_df[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
all_candles.append(batch_df)
# Move to next batch (add 1 minute to avoid overlap)
current_start = batch_end + timedelta(minutes=1)
# Rate limiting (Binance allows 1200/min)
time.sleep(0.05) # 50ms delay
# Progress logging
if api_calls_made % 10 == 0:
total_candles = sum(len(df) for df in all_candles)
logger.info(f"Progress: {api_calls_made} API calls, {total_candles} candles collected")
except Exception as e:
logger.error(f"Error in batch {current_start} to {batch_end}: {e}")
current_start += timedelta(minutes=batch_size)
time.sleep(1) # Wait longer on error
continue
if not all_candles:
logger.error(f"No data collected for {symbol}")
return None
# Combine all batches
df = pd.concat(all_candles, ignore_index=True)
df = df.sort_values('timestamp').drop_duplicates(subset=['timestamp']).reset_index(drop=True)
logger.info(f"Successfully fetched {len(df)} 1m candles for {symbol} ({api_calls_made} API calls)")
return df
except Exception as e:
logger.error(f"Error fetching 1m data range for {symbol}: {e}")
return None
def _extract_pivot_bounds_from_monthly_data(self, symbol: str, monthly_data: pd.DataFrame) -> Optional[PivotBounds]:
"""Extract pivot bounds using Williams Market Structure analysis"""
try:
@ -958,8 +1030,8 @@ class DataProvider:
def _refresh_pivot_bounds_for_symbol(self, symbol: str):
"""Refresh pivot bounds for a specific symbol"""
try:
# Collect monthly 1s data
monthly_data = self._collect_monthly_1s_data(symbol)
# Collect monthly 1m data
monthly_data = self._collect_monthly_1m_data(symbol)
if monthly_data is None or monthly_data.empty:
logger.warning(f"Could not collect monthly data for {symbol}")
return
@ -1074,17 +1146,13 @@ class DataProvider:
logger.warning(f"Error saving pivot bounds to cache for {symbol}: {e}")
def _load_monthly_data_from_cache(self, symbol: str) -> Optional[pd.DataFrame]:
"""Load monthly 1s data from cache"""
"""Load monthly 1m data from cache"""
try:
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1s.parquet"
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet"
if cache_file.exists():
# Check if cache is recent (less than 1 day old)
cache_age = time.time() - cache_file.stat().st_mtime
if cache_age < 86400: # 24 hours
df = pd.read_parquet(cache_file)
logger.info(f"Loaded {len(df)} 1m candles from cache for {symbol}")
return df
else:
logger.info(f"Monthly data cache for {symbol} is too old ({cache_age/3600:.1f}h)")
return None
@ -1093,11 +1161,11 @@ class DataProvider:
return None
def _save_monthly_data_to_cache(self, symbol: str, df: pd.DataFrame):
"""Save monthly 1s data to cache"""
"""Save monthly 1m data to cache"""
try:
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1s.parquet"
cache_file = self.monthly_data_cache_dir / f"{symbol.replace('/', '')}_monthly_1m.parquet"
df.to_parquet(cache_file, index=False)
logger.info(f"Saved {len(df)} monthly 1s candles to cache for {symbol}")
logger.info(f"Saved {len(df)} monthly 1m candles to cache for {symbol}")
except Exception as e:
logger.warning(f"Error saving monthly data to cache for {symbol}: {e}")

View File

@ -37,7 +37,7 @@ def test_pivot_normalization_system():
# Initialize data provider
symbols = ['ETH/USDT'] # Test with ETH only
timeframes = ['1s', '1m', '1h']
timeframes = ['1s']
logger.info("Initializing DataProvider with pivot-based normalization...")
data_provider = DataProvider(symbols=symbols, timeframes=timeframes)
@ -52,11 +52,11 @@ def test_pivot_normalization_system():
try:
# This will trigger monthly data collection and pivot analysis
logger.info(f"Testing monthly data collection for {symbol}...")
monthly_data = data_provider._collect_monthly_1s_data(symbol)
monthly_data = data_provider._collect_monthly_1m_data(symbol)
if monthly_data is not None:
print(f"✅ Monthly data collection SUCCESS")
print(f" 📊 Collected {len(monthly_data):,} 1s candles")
print(f" 📊 Collected {len(monthly_data):,} 1m candles")
print(f" 📅 Period: {monthly_data['timestamp'].min()} to {monthly_data['timestamp'].max()}")
print(f" 💰 Price range: ${monthly_data['low'].min():.2f} - ${monthly_data['high'].max():.2f}")
print(f" 📈 Volume range: {monthly_data['volume'].min():.2f} - {monthly_data['volume'].max():.2f}")

View File

@ -3010,8 +3010,7 @@ class TradingDashboard:
# Update internal model storage
for model_type, model_data in loaded_models.items():
model_info = model_data['info']
logger.info(f"Using best {model_type} model: {model_info.model_name} "
f"(Score: {model_info.metrics.get_composite_score():.3f})")
logger.info(f"Using best {model_type} model: {model_info.model_name} (Score: {model_info.metrics.get_composite_score():.3f})")
else:
logger.info("No managed models available, falling back to legacy loading")