data input audit, cleanup
This commit is contained in:
159
core/api_rate_limiter.py
Normal file
159
core/api_rate_limiter.py
Normal file
@ -0,0 +1,159 @@
|
||||
"""
|
||||
Simple shared API rate limiter and HTTP client wrapper.
|
||||
|
||||
Provides a singleton via get_rate_limiter() used by data providers
|
||||
to throttle external API calls (e.g., Binance) and centralize retries.
|
||||
|
||||
Windows-safe, ASCII-only logging.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
import logging
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
try:
|
||||
import requests
|
||||
except Exception: # requests should be available; fail lazily in make_request
|
||||
requests = None # type: ignore
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
"""Basic per-key interval limiter with retry/backoff HTTP wrapper.
|
||||
|
||||
- Per-key min interval between calls
|
||||
- Optional simple error backoff per key
|
||||
- Thread-safe
|
||||
"""
|
||||
|
||||
def __init__(self, config: Optional[Dict[str, Dict[str, Any]]] = None) -> None:
|
||||
# Defaults tailored for typical public REST endpoints
|
||||
self._config: Dict[str, Dict[str, Any]] = config or {
|
||||
# Binance REST defaults
|
||||
"binance_api": {
|
||||
"min_interval": 0.25, # seconds between calls (~4 req/s)
|
||||
"timeout": 10.0,
|
||||
"max_retries": 2,
|
||||
"backoff_base": 0.75, # seconds
|
||||
"backoff_factor": 2.0,
|
||||
}
|
||||
}
|
||||
self._lock = threading.Lock()
|
||||
self._last_request_ts: Dict[str, float] = {}
|
||||
self._error_backoff_until: Dict[str, float] = {}
|
||||
|
||||
def _now(self) -> float:
|
||||
return time.monotonic()
|
||||
|
||||
def _get_key_config(self, key: str) -> Dict[str, Any]:
|
||||
return self._config.get(key, {
|
||||
"min_interval": 0.5,
|
||||
"timeout": 10.0,
|
||||
"max_retries": 1,
|
||||
"backoff_base": 0.5,
|
||||
"backoff_factor": 2.0,
|
||||
})
|
||||
|
||||
def can_make_request(self, key: str) -> Tuple[bool, float]:
|
||||
"""Return (can_request, wait_time_seconds)."""
|
||||
cfg = self._get_key_config(key)
|
||||
now = self._now()
|
||||
with self._lock:
|
||||
min_interval = float(cfg.get("min_interval", 0.5))
|
||||
last = self._last_request_ts.get(key, 0.0)
|
||||
ready_at = last + min_interval
|
||||
|
||||
# Respect error backoff if set
|
||||
backoff_until = self._error_backoff_until.get(key, 0.0)
|
||||
ready_at = max(ready_at, backoff_until)
|
||||
|
||||
if now >= ready_at:
|
||||
return True, 0.0
|
||||
return False, max(0.0, ready_at - now)
|
||||
|
||||
def _note_request(self, key: str) -> None:
|
||||
with self._lock:
|
||||
self._last_request_ts[key] = self._now()
|
||||
|
||||
def _note_error(self, key: str, attempt_index: int) -> None:
|
||||
cfg = self._get_key_config(key)
|
||||
base = float(cfg.get("backoff_base", 0.5))
|
||||
factor = float(cfg.get("backoff_factor", 2.0))
|
||||
delay = base * (factor ** max(0, attempt_index))
|
||||
with self._lock:
|
||||
self._error_backoff_until[key] = max(self._error_backoff_until.get(key, 0.0), self._now() + delay)
|
||||
|
||||
def make_request(
|
||||
self,
|
||||
key: str,
|
||||
url: str,
|
||||
method: str = "GET",
|
||||
params: Optional[Dict[str, Any]] = None,
|
||||
data: Optional[Any] = None,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
) -> Optional["requests.Response"]:
|
||||
"""Perform an HTTP request with per-key rate limiting and retries.
|
||||
|
||||
Returns Response or None on failure.
|
||||
"""
|
||||
cfg = self._get_key_config(key)
|
||||
timeout = float(cfg.get("timeout", 10.0))
|
||||
max_retries = int(cfg.get("max_retries", 1))
|
||||
|
||||
if requests is None:
|
||||
logger.error("requests library not available")
|
||||
return None
|
||||
|
||||
method_upper = (method or "GET").upper()
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
can, wait = self.can_make_request(key)
|
||||
if not can:
|
||||
time.sleep(min(wait, 5.0)) # cap local wait
|
||||
|
||||
self._note_request(key)
|
||||
try:
|
||||
resp = requests.request(
|
||||
method=method_upper,
|
||||
url=url,
|
||||
params=params,
|
||||
data=data,
|
||||
headers=headers,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
# If server imposes rate limit or transient error, backoff and retry
|
||||
if resp is None or resp.status_code >= 500 or resp.status_code in (408, 429):
|
||||
logger.warning(
|
||||
"HTTP retry %s for key %s status %s", attempt, key, getattr(resp, "status_code", "n/a")
|
||||
)
|
||||
self._note_error(key, attempt)
|
||||
if attempt < max_retries:
|
||||
continue
|
||||
return resp
|
||||
|
||||
return resp
|
||||
|
||||
except Exception as ex:
|
||||
logger.warning("HTTP error on %s %s: %s", method_upper, url, ex)
|
||||
self._note_error(key, attempt)
|
||||
if attempt < max_retries:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
_singleton: Optional[RateLimiter] = None
|
||||
|
||||
|
||||
def get_rate_limiter() -> RateLimiter:
|
||||
global _singleton
|
||||
if _singleton is None:
|
||||
_singleton = RateLimiter()
|
||||
return _singleton
|
||||
|
||||
|
@ -2363,6 +2363,12 @@ class TradingOrchestrator:
|
||||
cnn_pred,
|
||||
inference_duration_ms=inference_duration_ms,
|
||||
)
|
||||
# Save audit image of inputs used for this inference
|
||||
try:
|
||||
from utils.audit_plotter import save_inference_audit_image
|
||||
save_inference_audit_image(base_data, model_name=model_name, symbol=symbol, out_root="audit_inputs")
|
||||
except Exception as _audit_ex:
|
||||
logger.debug(f"Audit image save skipped: {str(_audit_ex)}")
|
||||
await self._store_inference_data_async(
|
||||
model_name, model_input, cnn_pred, current_time, symbol
|
||||
)
|
||||
@ -2387,6 +2393,12 @@ class TradingOrchestrator:
|
||||
prediction,
|
||||
inference_duration_ms=inference_duration_ms,
|
||||
)
|
||||
# Save audit image of inputs used for this inference
|
||||
try:
|
||||
from utils.audit_plotter import save_inference_audit_image
|
||||
save_inference_audit_image(base_data, model_name=model_name, symbol=symbol, out_root="audit_inputs")
|
||||
except Exception as _audit_ex:
|
||||
logger.debug(f"Audit image save skipped: {str(_audit_ex)}")
|
||||
# Store input data for RL
|
||||
await self._store_inference_data_async(
|
||||
model_name, model_input, prediction, current_time, symbol
|
||||
@ -2412,6 +2424,12 @@ class TradingOrchestrator:
|
||||
prediction,
|
||||
inference_duration_ms=inference_duration_ms,
|
||||
)
|
||||
# Save audit image of inputs used for this inference
|
||||
try:
|
||||
from utils.audit_plotter import save_inference_audit_image
|
||||
save_inference_audit_image(base_data, model_name=model_name, symbol=symbol, out_root="audit_inputs")
|
||||
except Exception as _audit_ex:
|
||||
logger.debug(f"Audit image save skipped: {str(_audit_ex)}")
|
||||
# Store input data for generic model
|
||||
await self._store_inference_data_async(
|
||||
model_name, model_input, prediction, current_time, symbol
|
||||
|
Reference in New Issue
Block a user