""" Polymarket client for discovering relevant BTC/ETH price markets and fetching live data. Notes: - Uses public Gamma API for market discovery. Endpoints can change; keep URLs configurable. - Avoids any synthetic data. Returns empty lists when nothing is found. - Windows-safe ASCII logging only. Responsibilities: - Search active markets relevant to BTC and ETH price over the next N days - Extract scalar market metadata when available (lower/upper bounds) to derive implied price from share price - Optionally fetch order book or last prices for outcomes using CLOB REST if available This module focuses on read-only public data. Trading functionality is out of scope. """ from __future__ import annotations import logging import time import threading from dataclasses import dataclass from datetime import datetime, timedelta, timezone from typing import Any, Dict, List, Optional, Tuple from .api_rate_limiter import get_rate_limiter logger = logging.getLogger(__name__) @dataclass class ScalarMarketInfo: market_id: str title: str end_date: Optional[datetime] lower_bound: Optional[float] upper_bound: Optional[float] last_price: Optional[float] slug: Optional[str] url: Optional[str] asset: str # "BTC" or "ETH" when detected, else "" class PolymarketClient: """Simple Polymarket data client using public HTTP endpoints. The exact endpoints can change. By default, we use Gamma API for discovery. """ def __init__(self, gamma_base_url: str = "https://gamma-api.polymarket.com", clob_base_url: str = "https://clob.polymarket.com", user_agent: str = "gogo2-dashboard/1.0", ) -> None: self.gamma_base_url = gamma_base_url.rstrip("/") self.clob_base_url = clob_base_url.rstrip("/") self.user_agent = user_agent self._rl = get_rate_limiter() # In-memory cache; no synthetic values, just last successful responses self._last_markets: List[Dict[str, Any]] = [] self._last_scalar_infos: List[ScalarMarketInfo] = [] self._lock = threading.Lock() # ---------------------------- # Public API # ---------------------------- def discover_btc_eth_scalar_markets(self, days_ahead: int = 5) -> List[ScalarMarketInfo]: """Discover scalar markets about BTC/ETH price ending within days_ahead. Strategy: - Query markets with a search filter for keywords (BTC/Bitcoin/ETH/Ethereum) - Keep markets with endDate within now+days_ahead - Attempt to parse scalar bounds and last price Return empty list when nothing suitable is found. """ try: raw_markets = self._fetch_markets() cutoff = datetime.now(timezone.utc) + timedelta(days=max(0, int(days_ahead))) scalar_infos: List[ScalarMarketInfo] = [] for m in raw_markets: try: title = str(m.get("title") or m.get("question") or "") if not title: continue title_l = title.lower() asset = "" if "btc" in title_l or "bitcoin" in title_l: asset = "BTC" elif "eth" in title_l or "ethereum" in title_l: asset = "ETH" else: continue # Parse end date end_dt = self._parse_datetime(m.get("endDate") or m.get("closeDate")) if end_dt and end_dt > cutoff: # Only next N days continue # Heuristic: detect scalar markets and bounds lower_bound, upper_bound = self._extract_bounds(m) last_price = self._extract_last_price(m) # Only accept if appears scalar (has bounds) if lower_bound is None or upper_bound is None: continue scalar_infos.append(ScalarMarketInfo( market_id=str(m.get("id") or m.get("_id") or ""), title=title, end_date=end_dt, lower_bound=lower_bound, upper_bound=upper_bound, last_price=last_price, slug=m.get("slug"), url=self._compose_market_url(m), asset=asset, )) except Exception as inner_ex: logger.debug("Polymarket market parse error: %s", inner_ex) with self._lock: self._last_scalar_infos = scalar_infos return scalar_infos except Exception as ex: logger.error("Polymarket discovery error: %s", ex) return [] def get_cached_scalar_markets(self) -> List[ScalarMarketInfo]: with self._lock: return list(self._last_scalar_infos) def derive_implied_price(self, market: ScalarMarketInfo) -> Optional[float]: """For scalar markets with last_price in [0,1] and known bounds, derive implied USD price. implied = lower + (upper - lower) * last_price Returns None if data insufficient. """ try: if market is None: return None if market.last_price is None: return None if market.lower_bound is None or market.upper_bound is None: return None p = float(market.last_price) lb = float(market.lower_bound) ub = float(market.upper_bound) if ub <= lb: return None if p < 0 or p > 1: # Some APIs might provide unscaled price; ignore if out of [0,1] return None return lb + (ub - lb) * p except Exception as ex: logger.debug("Implied price calc error: %s", ex) return None # ---------------------------- # Internal helpers # ---------------------------- def _fetch_markets(self) -> List[Dict[str, Any]]: """Fetch active markets from Gamma API. Uses conservative params to get recent/active markets and includes descriptions. Returns empty list if any error occurs. """ url = f"{self.gamma_base_url}/markets" params = { "limit": 200, "active": "true", "withDescription": "true", # Some gamma deployments support search param; we do broader fetch then filter locally } headers = {"User-Agent": self.user_agent} resp = self._rl.make_request("polymarket_gamma", url, method="GET", params=params, headers=headers) if resp is None: return [] if resp.status_code != 200: logger.warning("Polymarket markets status: %s", resp.status_code) return [] try: data = resp.json() # type: ignore except Exception as ex: logger.error("Polymarket markets JSON error: %s", ex) return [] # Gamma may return object with `data` or direct list markets: List[Dict[str, Any]] if isinstance(data, dict) and isinstance(data.get("data"), list): markets = data.get("data", []) elif isinstance(data, list): markets = data else: markets = [] with self._lock: self._last_markets = markets return markets def _parse_datetime(self, value: Any) -> Optional[datetime]: if not value: return None try: # Common ISO-8601 variants s = str(value) # Ensure Z -> +00:00 for Python <3.11 compatibility if s.endswith("Z"): s = s.replace("Z", "+00:00") return datetime.fromisoformat(s) except Exception: return None def _extract_bounds(self, market: Dict[str, Any]) -> Tuple[Optional[float], Optional[float]]: """Attempt to extract scalar bounds from various field names used in Polymarket responses.""" lb = market.get("lowerBound") or market.get("min") or market.get("lower") ub = market.get("upperBound") or market.get("max") or market.get("upper") try: return (float(lb) if lb is not None else None, float(ub) if ub is not None else None) except Exception: return (None, None) def _extract_last_price(self, market: Dict[str, Any]) -> Optional[float]: """Attempt to extract last traded share price in [0,1] for scalar markets.""" # Some APIs expose price under `price`, some under `outcomePrices` or `lastPrice` price_candidates: List[Any] = [] price_candidates.append(market.get("lastPrice")) price_candidates.append(market.get("price")) # If outcomePrices is present and scalar, the first could be used as indicator op = market.get("outcomePrices") if isinstance(op, list) and op: price_candidates.append(op[0]) for val in price_candidates: try: if val is None: continue f = float(val) if 0.0 <= f <= 1.0: return f except Exception: continue return None def _compose_market_url(self, market: Dict[str, Any]) -> Optional[str]: slug = market.get("slug") if slug: return f"https://polymarket.com/event/{slug}" return None