candles cache

This commit is contained in:
Dobromir Popov 2025-03-18 23:40:14 +02:00
parent 3347ecb494
commit c4a803876e

View File

@ -14,6 +14,9 @@ import numpy as np
from collections import deque
import time
from threading import Thread
import requests
import os
from datetime import datetime, timedelta
# Configure logging with more detailed format
logging.basicConfig(
@ -599,11 +602,11 @@ class RealTimeChart:
'1d': None
}
self.candle_cache = CandleCache() # Initialize local candle cache
self.historical_data = BinanceHistoricalData() # For fetching historical data
logger.info(f"Initializing RealTimeChart for {symbol}")
# We'll populate the cache as data comes in rather than trying to load on startup
# when there might not be any ticks available yet
logger.info(f"Cache will be populated as data becomes available for {symbol}")
# Load historical data for longer timeframes at startup
self._load_historical_data()
# Button style
button_style = {
@ -1152,6 +1155,186 @@ class RealTimeChart:
# Re-raise other exceptions
raise
def _load_historical_data(self):
"""Load historical data for 1m, 1h, and 1d timeframes from Binance API"""
try:
logger.info(f"Loading historical data for {self.symbol}...")
# Define intervals to fetch
intervals = {
'1m': 60,
'1h': 3600,
'1d': 86400
}
for interval_key, interval_seconds in intervals.items():
# Fetch historical data
historical_df = self.historical_data.get_historical_candles(
symbol=self.symbol,
interval_seconds=interval_seconds,
limit=500 # Get 500 candles
)
if not historical_df.empty:
logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key}")
# Add to cache
for _, row in historical_df.iterrows():
# Convert to dict for storage
candle_dict = row.to_dict()
self.candle_cache.candles[interval_key].append(candle_dict)
# Update ohlcv_cache
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key)
logger.info(f"Added {len(historical_df)} candles to {interval_key} cache")
else:
logger.warning(f"No historical data available for {self.symbol} {interval_key}")
except Exception as e:
logger.error(f"Error loading historical data: {str(e)}")
import traceback
logger.error(traceback.format_exc())
class BinanceHistoricalData:
"""Fetch historical candle data from Binance"""
def __init__(self):
self.base_url = "https://api.binance.com/api/v3/klines"
# Create a cache directory if it doesn't exist
self.cache_dir = os.path.join(os.getcwd(), "cache")
os.makedirs(self.cache_dir, exist_ok=True)
logger.info(f"Initialized BinanceHistoricalData with cache directory: {self.cache_dir}")
def _get_interval_string(self, interval_seconds: int) -> str:
"""Convert interval seconds to Binance interval string"""
if interval_seconds == 60: # 1m
return "1m"
elif interval_seconds == 3600: # 1h
return "1h"
elif interval_seconds == 86400: # 1d
return "1d"
else:
# Default to 1m if not recognized
logger.warning(f"Unrecognized interval {interval_seconds}s, defaulting to 1m")
return "1m"
def _get_cache_filename(self, symbol: str, interval: str) -> str:
"""Generate cache filename for the symbol and interval"""
# Replace any slashes in symbol with underscore
safe_symbol = symbol.replace("/", "_")
return os.path.join(self.cache_dir, f"{safe_symbol}_{interval}_candles.csv")
def _load_from_cache(self, symbol: str, interval: str) -> Optional[pd.DataFrame]:
"""Load candle data from cache if available and not expired"""
filename = self._get_cache_filename(symbol, interval)
if not os.path.exists(filename):
logger.debug(f"No cache file found for {symbol} {interval}")
return None
# Check if cache is fresh (less than 1 hour old for anything but 1d, 1 day for 1d)
file_age = time.time() - os.path.getmtime(filename)
max_age = 86400 if interval == "1d" else 3600 # 1 day for 1d, 1 hour for others
if file_age > max_age:
logger.debug(f"Cache for {symbol} {interval} is expired ({file_age:.1f}s old)")
return None
try:
df = pd.read_csv(filename)
# Convert timestamp string back to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
logger.info(f"Loaded {len(df)} candles from cache for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error loading from cache: {str(e)}")
return None
def _save_to_cache(self, df: pd.DataFrame, symbol: str, interval: str) -> bool:
"""Save candle data to cache"""
if df.empty:
logger.warning(f"No data to cache for {symbol} {interval}")
return False
filename = self._get_cache_filename(symbol, interval)
try:
df.to_csv(filename, index=False)
logger.info(f"Cached {len(df)} candles for {symbol} {interval} to {filename}")
return True
except Exception as e:
logger.error(f"Error saving to cache: {str(e)}")
return False
def get_historical_candles(self, symbol: str, interval_seconds: int, limit: int = 500) -> pd.DataFrame:
"""Get historical candle data for the specified symbol and interval"""
# Convert to Binance format
clean_symbol = symbol.replace("/", "")
interval = self._get_interval_string(interval_seconds)
# Try to load from cache first
cached_data = self._load_from_cache(symbol, interval)
if cached_data is not None and len(cached_data) >= limit:
return cached_data.tail(limit)
# Fetch from API if not cached or insufficient
try:
logger.info(f"Fetching {limit} historical candles for {symbol} ({interval}) from Binance API")
params = {
"symbol": clean_symbol,
"interval": interval,
"limit": limit
}
response = requests.get(self.base_url, params=params)
response.raise_for_status() # Raise exception for HTTP errors
# Process the data
candles = response.json()
if not candles:
logger.warning(f"No candles returned from Binance for {symbol} {interval}")
return pd.DataFrame()
# Convert to DataFrame - Binance returns data in this format:
# [
# [
# 1499040000000, // Open time
# "0.01634790", // Open
# "0.80000000", // High
# "0.01575800", // Low
# "0.01577100", // Close
# "148976.11427815", // Volume
# ... // Ignore the rest
# ],
# ...
# ]
df = pd.DataFrame(candles, columns=[
"timestamp", "open", "high", "low", "close", "volume",
"close_time", "quote_asset_volume", "number_of_trades",
"taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "ignore"
])
# Convert types
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
for col in ["open", "high", "low", "close", "volume"]:
df[col] = df[col].astype(float)
# Keep only needed columns
df = df[["timestamp", "open", "high", "low", "close", "volume"]]
# Cache the results
self._save_to_cache(df, symbol, interval)
logger.info(f"Successfully fetched {len(df)} candles for {symbol} {interval}")
return df
except Exception as e:
logger.error(f"Error fetching historical data for {symbol} {interval}: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return pd.DataFrame()
async def main():
symbols = ["ETH/USDT", "BTC/USDT"]
logger.info(f"Starting application for symbols: {symbols}")