better realtime chart data
This commit is contained in:
parent
dc532f8795
commit
d784441a3f
189
realtime.py
189
realtime.py
@ -636,12 +636,15 @@ class CandleCache:
|
|||||||
|
|
||||||
def add_candles(self, interval: str, new_candles: pd.DataFrame):
|
def add_candles(self, interval: str, new_candles: pd.DataFrame):
|
||||||
if interval in self.candles and not new_candles.empty:
|
if interval in self.candles and not new_candles.empty:
|
||||||
|
# Convert DataFrame to list of dicts to avoid pandas issues
|
||||||
for _, row in new_candles.iterrows():
|
for _, row in new_candles.iterrows():
|
||||||
self.candles[interval].append(row)
|
candle_dict = row.to_dict()
|
||||||
|
self.candles[interval].append(candle_dict)
|
||||||
logger.debug(f"Added {len(new_candles)} candles to {interval} cache")
|
logger.debug(f"Added {len(new_candles)} candles to {interval} cache")
|
||||||
|
|
||||||
def get_recent_candles(self, interval: str, count: int = 500) -> pd.DataFrame:
|
def get_recent_candles(self, interval: str, count: int = 500) -> pd.DataFrame:
|
||||||
if interval in self.candles and self.candles[interval]:
|
if interval in self.candles and self.candles[interval]:
|
||||||
|
# Convert deque to list of dicts first
|
||||||
recent_candles = list(self.candles[interval])[-count:]
|
recent_candles = list(self.candles[interval])[-count:]
|
||||||
return pd.DataFrame(recent_candles)
|
return pd.DataFrame(recent_candles)
|
||||||
return pd.DataFrame()
|
return pd.DataFrame()
|
||||||
@ -660,27 +663,30 @@ class CandleCache:
|
|||||||
logger.warning(f"No timestamp column in new candles for {interval}")
|
logger.warning(f"No timestamp column in new candles for {interval}")
|
||||||
return
|
return
|
||||||
|
|
||||||
last_cached_time = None
|
|
||||||
if self.candles[interval]:
|
|
||||||
try:
|
|
||||||
# Get the timestamp from the last cached candle
|
|
||||||
last_cached_candle = self.candles[interval][-1]
|
|
||||||
if isinstance(last_cached_candle, dict) and 'timestamp' in last_cached_candle:
|
|
||||||
last_cached_time = last_cached_candle['timestamp']
|
|
||||||
logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}")
|
|
||||||
except (IndexError, KeyError) as e:
|
|
||||||
logger.error(f"Error accessing timestamp from last cached candle: {e}")
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Only filter if we have a valid last_cached_time
|
# If we have no candles in cache, add all new candles
|
||||||
if last_cached_time is not None:
|
if not self.candles[interval]:
|
||||||
filtered_candles = new_candles[new_candles['timestamp'] > last_cached_time]
|
logger.debug(f"No existing candles for {interval}, adding all {len(new_candles)} candles")
|
||||||
logger.debug(f"Filtered {len(filtered_candles)} new candles for {interval}")
|
|
||||||
self.add_candles(interval, filtered_candles)
|
|
||||||
else:
|
|
||||||
# If no previous candles, add all
|
|
||||||
logger.debug(f"No previous candles, adding all {len(new_candles)} candles to {interval} cache")
|
|
||||||
self.add_candles(interval, new_candles)
|
self.add_candles(interval, new_candles)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get the timestamp from the last cached candle
|
||||||
|
last_cached_candle = self.candles[interval][-1]
|
||||||
|
if isinstance(last_cached_candle, dict) and 'timestamp' in last_cached_candle:
|
||||||
|
last_cached_time = last_cached_candle['timestamp']
|
||||||
|
logger.debug(f"Last cached timestamp for {interval}: {last_cached_time}")
|
||||||
|
|
||||||
|
# Filter new candles that are after the last cached candle
|
||||||
|
filtered_candles = new_candles[new_candles['timestamp'] > last_cached_time]
|
||||||
|
|
||||||
|
if not filtered_candles.empty:
|
||||||
|
logger.debug(f"Adding {len(filtered_candles)} new candles for {interval}")
|
||||||
|
self.add_candles(interval, filtered_candles)
|
||||||
|
else:
|
||||||
|
logger.debug(f"No new candles to add for {interval}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Invalid last cached candle format for {interval}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error updating cache for {interval}: {str(e)}")
|
logger.error(f"Error updating cache for {interval}: {str(e)}")
|
||||||
import traceback
|
import traceback
|
||||||
@ -703,6 +709,7 @@ class RealTimeChart:
|
|||||||
}
|
}
|
||||||
self.candle_cache = CandleCache() # Initialize local candle cache
|
self.candle_cache = CandleCache() # Initialize local candle cache
|
||||||
self.historical_data = BinanceHistoricalData() # For fetching historical data
|
self.historical_data = BinanceHistoricalData() # For fetching historical data
|
||||||
|
self.last_cache_save_time = time.time() # Track last time we saved cache to disk
|
||||||
logger.info(f"Initializing RealTimeChart for {symbol}")
|
logger.info(f"Initializing RealTimeChart for {symbol}")
|
||||||
|
|
||||||
# Load historical data for longer timeframes at startup
|
# Load historical data for longer timeframes at startup
|
||||||
@ -1035,13 +1042,17 @@ class RealTimeChart:
|
|||||||
price_stats = self.tick_storage.get_price_stats()
|
price_stats = self.tick_storage.get_price_stats()
|
||||||
time_stats = self.tick_storage.get_time_based_stats()
|
time_stats = self.tick_storage.get_time_based_stats()
|
||||||
|
|
||||||
|
# Periodically save candles to disk
|
||||||
|
if n % 60 == 0: # Every 60 chart updates (~ every 30 seconds at 500ms interval)
|
||||||
|
self._save_candles_to_disk()
|
||||||
|
|
||||||
logger.debug(f"Current price: {current_price}, Stats: {price_stats}")
|
logger.debug(f"Current price: {current_price}, Stats: {price_stats}")
|
||||||
|
|
||||||
fig = make_subplots(
|
fig = make_subplots(
|
||||||
rows=6, cols=1, # Adjusted to accommodate new subcharts
|
rows=6, cols=1, # Adjusted to accommodate new subcharts
|
||||||
vertical_spacing=0.02, # Reduced for better use of vertical space
|
vertical_spacing=0.05, # Reduced for better use of vertical space
|
||||||
subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1s OHLCV', '1m OHLCV', '1h OHLCV', '1d OHLCV'),
|
subplot_titles=(f'{self.symbol} Price ({interval}s)', 'Volume', '1s OHLCV', '1m OHLCV', '1h OHLCV', '1d OHLCV'),
|
||||||
row_heights=[0.5, 0.1, 0.1, 0.1, 0.1, 0.1] # Give more space to main chart
|
row_heights=[0.3, 0.15, 0.15, 0.15, 0.15, 0.15] # Give more space to main chart
|
||||||
)
|
)
|
||||||
|
|
||||||
if not df.empty and len(df) > 0:
|
if not df.empty and len(df) > 0:
|
||||||
@ -1856,44 +1867,134 @@ class RealTimeChart:
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
def _load_historical_data(self):
|
def _load_historical_data(self):
|
||||||
"""Load historical data for 1m, 1h, and 1d timeframes from Binance API"""
|
"""Load historical data for all timeframes from Binance API and local cache"""
|
||||||
try:
|
try:
|
||||||
logger.info(f"Loading historical data for {self.symbol}...")
|
logger.info(f"Loading historical data for {self.symbol}...")
|
||||||
|
|
||||||
# Define intervals to fetch
|
# Define intervals to fetch
|
||||||
intervals = {
|
intervals = {
|
||||||
|
'1s': 1,
|
||||||
'1m': 60,
|
'1m': 60,
|
||||||
'1h': 3600,
|
'1h': 3600,
|
||||||
'1d': 86400
|
'1d': 86400
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# First try to load from local cache files
|
||||||
for interval_key, interval_seconds in intervals.items():
|
for interval_key, interval_seconds in intervals.items():
|
||||||
# Fetch historical data
|
try:
|
||||||
historical_df = self.historical_data.get_historical_candles(
|
cache_file = os.path.join(self.historical_data.cache_dir,
|
||||||
symbol=self.symbol,
|
f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv")
|
||||||
interval_seconds=interval_seconds,
|
|
||||||
limit=500 # Get 500 candles
|
if os.path.exists(cache_file):
|
||||||
)
|
# Check if cache is fresh (less than 1 day old for anything but 1d, 3 days for 1d)
|
||||||
|
file_age = time.time() - os.path.getmtime(cache_file)
|
||||||
|
max_age = 259200 if interval_key == '1d' else 86400 # 3 days for 1d, 1 day for others
|
||||||
|
|
||||||
|
if file_age <= max_age:
|
||||||
|
cached_df = pd.read_csv(cache_file)
|
||||||
|
if not cached_df.empty:
|
||||||
|
# Convert timestamp string back to datetime
|
||||||
|
if 'timestamp' in cached_df.columns:
|
||||||
|
try:
|
||||||
|
cached_df['timestamp'] = pd.to_datetime(cached_df['timestamp'])
|
||||||
|
except:
|
||||||
|
# If conversion fails, it might already be in the right format
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Only keep the last 500 candles
|
||||||
|
if len(cached_df) > 500:
|
||||||
|
cached_df = cached_df.tail(500)
|
||||||
|
|
||||||
|
# Add to cache
|
||||||
|
for _, row in cached_df.iterrows():
|
||||||
|
candle_dict = row.to_dict()
|
||||||
|
self.candle_cache.candles[interval_key].append(candle_dict)
|
||||||
|
|
||||||
|
# Update ohlcv_cache
|
||||||
|
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key)
|
||||||
|
logger.info(f"Loaded {len(cached_df)} cached {interval_key} candles from disk")
|
||||||
|
|
||||||
|
# Skip fetching from API if we loaded from cache (except for 1d timeframe which we always refresh)
|
||||||
|
if interval_key != '1d' and interval_key != '1h':
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error loading cached {interval_key} candles: {str(e)}")
|
||||||
|
|
||||||
|
# For timeframes other than 1s, fetch from API as backup or for fresh data
|
||||||
|
for interval_key, interval_seconds in intervals.items():
|
||||||
|
# Skip 1s for API requests
|
||||||
|
if interval_key == '1s':
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Fetch historical data from API
|
||||||
|
try:
|
||||||
|
logger.info(f"Fetching {interval_key} candles from API for {self.symbol}")
|
||||||
|
historical_df = self.historical_data.get_historical_candles(
|
||||||
|
symbol=self.symbol,
|
||||||
|
interval_seconds=interval_seconds,
|
||||||
|
limit=500 # Get 500 candles
|
||||||
|
)
|
||||||
|
|
||||||
|
if not historical_df.empty:
|
||||||
|
logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key} from API")
|
||||||
|
|
||||||
|
# If we already have data in cache, merge with new data to avoid duplicates
|
||||||
|
if self.ohlcv_cache[interval_key] is not None and not self.ohlcv_cache[interval_key].empty:
|
||||||
|
existing_df = self.ohlcv_cache[interval_key]
|
||||||
|
# Get the latest timestamp from existing data
|
||||||
|
latest_time = existing_df['timestamp'].max()
|
||||||
|
# Only keep newer records from API
|
||||||
|
new_candles = historical_df[historical_df['timestamp'] > latest_time]
|
||||||
|
if not new_candles.empty:
|
||||||
|
logger.info(f"Adding {len(new_candles)} new candles to existing {interval_key} cache")
|
||||||
|
# Add to cache
|
||||||
|
for _, row in new_candles.iterrows():
|
||||||
|
candle_dict = row.to_dict()
|
||||||
|
self.candle_cache.candles[interval_key].append(candle_dict)
|
||||||
|
else:
|
||||||
|
# No existing data, add all from API
|
||||||
|
for _, row in historical_df.iterrows():
|
||||||
|
candle_dict = row.to_dict()
|
||||||
|
self.candle_cache.candles[interval_key].append(candle_dict)
|
||||||
|
|
||||||
|
# Update ohlcv_cache with combined data
|
||||||
|
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key)
|
||||||
|
logger.info(f"Total {interval_key} candles in cache: {len(self.ohlcv_cache[interval_key])}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"No historical data available from API for {self.symbol} {interval_key}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching {interval_key} data from API: {str(e)}")
|
||||||
|
|
||||||
if not historical_df.empty:
|
|
||||||
logger.info(f"Loaded {len(historical_df)} historical candles for {self.symbol} {interval_key}")
|
|
||||||
# Add to cache
|
|
||||||
for _, row in historical_df.iterrows():
|
|
||||||
# Convert to dict for storage
|
|
||||||
candle_dict = row.to_dict()
|
|
||||||
self.candle_cache.candles[interval_key].append(candle_dict)
|
|
||||||
|
|
||||||
# Update ohlcv_cache
|
|
||||||
self.ohlcv_cache[interval_key] = self.candle_cache.get_recent_candles(interval_key)
|
|
||||||
logger.info(f"Added {len(historical_df)} candles to {interval_key} cache")
|
|
||||||
else:
|
|
||||||
logger.warning(f"No historical data available for {self.symbol} {interval_key}")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error loading historical data: {str(e)}")
|
logger.error(f"Error in _load_historical_data: {str(e)}")
|
||||||
import traceback
|
import traceback
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
|
|
||||||
|
def _save_candles_to_disk(self):
|
||||||
|
"""Save current candle cache to disk for persistence between runs"""
|
||||||
|
try:
|
||||||
|
# Only save if we have data and sufficient time has passed (every 5 minutes)
|
||||||
|
current_time = time.time()
|
||||||
|
if current_time - self.last_cache_save_time < 300: # 5 minutes
|
||||||
|
return
|
||||||
|
|
||||||
|
# Save each timeframe's candles to disk
|
||||||
|
for interval_key, candles in self.candle_cache.candles.items():
|
||||||
|
if candles:
|
||||||
|
# Convert to DataFrame
|
||||||
|
df = pd.DataFrame(list(candles))
|
||||||
|
if not df.empty:
|
||||||
|
# Save to disk in the cache directory
|
||||||
|
cache_file = os.path.join(self.historical_data.cache_dir,
|
||||||
|
f"{self.symbol.replace('/', '_')}_{interval_key}_candles.csv")
|
||||||
|
df.to_csv(cache_file, index=False)
|
||||||
|
logger.info(f"Saved {len(df)} {interval_key} candles to {cache_file}")
|
||||||
|
|
||||||
|
self.last_cache_save_time = current_time
|
||||||
|
logger.info(f"Saved all candle caches to disk at {datetime.now()}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error saving candles to disk: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
symbols = ["ETH/USDT", "BTC/USDT"]
|
symbols = ["ETH/USDT", "BTC/USDT"]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user