wip misc; cleaup launch
This commit is contained in:
253
core/polymarket_client.py
Normal file
253
core/polymarket_client.py
Normal file
@@ -0,0 +1,253 @@
|
||||
"""
|
||||
Polymarket client for discovering relevant BTC/ETH price markets and fetching live data.
|
||||
|
||||
Notes:
|
||||
- Uses public Gamma API for market discovery. Endpoints can change; keep URLs configurable.
|
||||
- Avoids any synthetic data. Returns empty lists when nothing is found.
|
||||
- Windows-safe ASCII logging only.
|
||||
|
||||
Responsibilities:
|
||||
- Search active markets relevant to BTC and ETH price over the next N days
|
||||
- Extract scalar market metadata when available (lower/upper bounds) to derive implied price from share price
|
||||
- Optionally fetch order book or last prices for outcomes using CLOB REST if available
|
||||
|
||||
This module focuses on read-only public data. Trading functionality is out of scope.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from .api_rate_limiter import get_rate_limiter
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScalarMarketInfo:
|
||||
market_id: str
|
||||
title: str
|
||||
end_date: Optional[datetime]
|
||||
lower_bound: Optional[float]
|
||||
upper_bound: Optional[float]
|
||||
last_price: Optional[float]
|
||||
slug: Optional[str]
|
||||
url: Optional[str]
|
||||
asset: str # "BTC" or "ETH" when detected, else ""
|
||||
|
||||
|
||||
class PolymarketClient:
|
||||
"""Simple Polymarket data client using public HTTP endpoints.
|
||||
|
||||
The exact endpoints can change. By default, we use Gamma API for discovery.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
gamma_base_url: str = "https://gamma-api.polymarket.com",
|
||||
clob_base_url: str = "https://clob.polymarket.com",
|
||||
user_agent: str = "gogo2-dashboard/1.0",
|
||||
) -> None:
|
||||
self.gamma_base_url = gamma_base_url.rstrip("/")
|
||||
self.clob_base_url = clob_base_url.rstrip("/")
|
||||
self.user_agent = user_agent
|
||||
self._rl = get_rate_limiter()
|
||||
|
||||
# In-memory cache; no synthetic values, just last successful responses
|
||||
self._last_markets: List[Dict[str, Any]] = []
|
||||
self._last_scalar_infos: List[ScalarMarketInfo] = []
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# ----------------------------
|
||||
# Public API
|
||||
# ----------------------------
|
||||
def discover_btc_eth_scalar_markets(self, days_ahead: int = 5) -> List[ScalarMarketInfo]:
|
||||
"""Discover scalar markets about BTC/ETH price ending within days_ahead.
|
||||
|
||||
Strategy:
|
||||
- Query markets with a search filter for keywords (BTC/Bitcoin/ETH/Ethereum)
|
||||
- Keep markets with endDate within now+days_ahead
|
||||
- Attempt to parse scalar bounds and last price
|
||||
Return empty list when nothing suitable is found.
|
||||
"""
|
||||
try:
|
||||
raw_markets = self._fetch_markets()
|
||||
cutoff = datetime.now(timezone.utc) + timedelta(days=max(0, int(days_ahead)))
|
||||
|
||||
scalar_infos: List[ScalarMarketInfo] = []
|
||||
for m in raw_markets:
|
||||
try:
|
||||
title = str(m.get("title") or m.get("question") or "")
|
||||
if not title:
|
||||
continue
|
||||
title_l = title.lower()
|
||||
asset = ""
|
||||
if "btc" in title_l or "bitcoin" in title_l:
|
||||
asset = "BTC"
|
||||
elif "eth" in title_l or "ethereum" in title_l:
|
||||
asset = "ETH"
|
||||
else:
|
||||
continue
|
||||
|
||||
# Parse end date
|
||||
end_dt = self._parse_datetime(m.get("endDate") or m.get("closeDate"))
|
||||
if end_dt and end_dt > cutoff:
|
||||
# Only next N days
|
||||
continue
|
||||
|
||||
# Heuristic: detect scalar markets and bounds
|
||||
lower_bound, upper_bound = self._extract_bounds(m)
|
||||
last_price = self._extract_last_price(m)
|
||||
|
||||
# Only accept if appears scalar (has bounds)
|
||||
if lower_bound is None or upper_bound is None:
|
||||
continue
|
||||
|
||||
scalar_infos.append(ScalarMarketInfo(
|
||||
market_id=str(m.get("id") or m.get("_id") or ""),
|
||||
title=title,
|
||||
end_date=end_dt,
|
||||
lower_bound=lower_bound,
|
||||
upper_bound=upper_bound,
|
||||
last_price=last_price,
|
||||
slug=m.get("slug"),
|
||||
url=self._compose_market_url(m),
|
||||
asset=asset,
|
||||
))
|
||||
except Exception as inner_ex:
|
||||
logger.debug("Polymarket market parse error: %s", inner_ex)
|
||||
|
||||
with self._lock:
|
||||
self._last_scalar_infos = scalar_infos
|
||||
return scalar_infos
|
||||
except Exception as ex:
|
||||
logger.error("Polymarket discovery error: %s", ex)
|
||||
return []
|
||||
|
||||
def get_cached_scalar_markets(self) -> List[ScalarMarketInfo]:
|
||||
with self._lock:
|
||||
return list(self._last_scalar_infos)
|
||||
|
||||
def derive_implied_price(self, market: ScalarMarketInfo) -> Optional[float]:
|
||||
"""For scalar markets with last_price in [0,1] and known bounds, derive implied USD price.
|
||||
|
||||
implied = lower + (upper - lower) * last_price
|
||||
Returns None if data insufficient.
|
||||
"""
|
||||
try:
|
||||
if market is None:
|
||||
return None
|
||||
if market.last_price is None:
|
||||
return None
|
||||
if market.lower_bound is None or market.upper_bound is None:
|
||||
return None
|
||||
p = float(market.last_price)
|
||||
lb = float(market.lower_bound)
|
||||
ub = float(market.upper_bound)
|
||||
if ub <= lb:
|
||||
return None
|
||||
if p < 0 or p > 1:
|
||||
# Some APIs might provide unscaled price; ignore if out of [0,1]
|
||||
return None
|
||||
return lb + (ub - lb) * p
|
||||
except Exception as ex:
|
||||
logger.debug("Implied price calc error: %s", ex)
|
||||
return None
|
||||
|
||||
# ----------------------------
|
||||
# Internal helpers
|
||||
# ----------------------------
|
||||
def _fetch_markets(self) -> List[Dict[str, Any]]:
|
||||
"""Fetch active markets from Gamma API.
|
||||
|
||||
Uses conservative params to get recent/active markets and includes descriptions.
|
||||
Returns empty list if any error occurs.
|
||||
"""
|
||||
url = f"{self.gamma_base_url}/markets"
|
||||
params = {
|
||||
"limit": 200,
|
||||
"active": "true",
|
||||
"withDescription": "true",
|
||||
# Some gamma deployments support search param; we do broader fetch then filter locally
|
||||
}
|
||||
headers = {"User-Agent": self.user_agent}
|
||||
resp = self._rl.make_request("polymarket_gamma", url, method="GET", params=params, headers=headers)
|
||||
if resp is None:
|
||||
return []
|
||||
if resp.status_code != 200:
|
||||
logger.warning("Polymarket markets status: %s", resp.status_code)
|
||||
return []
|
||||
try:
|
||||
data = resp.json() # type: ignore
|
||||
except Exception as ex:
|
||||
logger.error("Polymarket markets JSON error: %s", ex)
|
||||
return []
|
||||
|
||||
# Gamma may return object with `data` or direct list
|
||||
markets: List[Dict[str, Any]]
|
||||
if isinstance(data, dict) and isinstance(data.get("data"), list):
|
||||
markets = data.get("data", [])
|
||||
elif isinstance(data, list):
|
||||
markets = data
|
||||
else:
|
||||
markets = []
|
||||
|
||||
with self._lock:
|
||||
self._last_markets = markets
|
||||
return markets
|
||||
|
||||
def _parse_datetime(self, value: Any) -> Optional[datetime]:
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
# Common ISO-8601 variants
|
||||
s = str(value)
|
||||
# Ensure Z -> +00:00 for Python <3.11 compatibility
|
||||
if s.endswith("Z"):
|
||||
s = s.replace("Z", "+00:00")
|
||||
return datetime.fromisoformat(s)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _extract_bounds(self, market: Dict[str, Any]) -> Tuple[Optional[float], Optional[float]]:
|
||||
"""Attempt to extract scalar bounds from various field names used in Polymarket responses."""
|
||||
lb = market.get("lowerBound") or market.get("min") or market.get("lower")
|
||||
ub = market.get("upperBound") or market.get("max") or market.get("upper")
|
||||
try:
|
||||
return (float(lb) if lb is not None else None, float(ub) if ub is not None else None)
|
||||
except Exception:
|
||||
return (None, None)
|
||||
|
||||
def _extract_last_price(self, market: Dict[str, Any]) -> Optional[float]:
|
||||
"""Attempt to extract last traded share price in [0,1] for scalar markets."""
|
||||
# Some APIs expose price under `price`, some under `outcomePrices` or `lastPrice`
|
||||
price_candidates: List[Any] = []
|
||||
price_candidates.append(market.get("lastPrice"))
|
||||
price_candidates.append(market.get("price"))
|
||||
# If outcomePrices is present and scalar, the first could be used as indicator
|
||||
op = market.get("outcomePrices")
|
||||
if isinstance(op, list) and op:
|
||||
price_candidates.append(op[0])
|
||||
for val in price_candidates:
|
||||
try:
|
||||
if val is None:
|
||||
continue
|
||||
f = float(val)
|
||||
if 0.0 <= f <= 1.0:
|
||||
return f
|
||||
except Exception:
|
||||
continue
|
||||
return None
|
||||
|
||||
def _compose_market_url(self, market: Dict[str, Any]) -> Optional[str]:
|
||||
slug = market.get("slug")
|
||||
if slug:
|
||||
return f"https://polymarket.com/event/{slug}"
|
||||
return None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user