From 75565e21d8de61c10c0fa7a35c8ec82ebff7d9ff Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 12 Oct 2024 09:55:45 +0300 Subject: [PATCH] stability; fixing getting actual prices coingeco --- crypto/sol/app.py | 196 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 152 insertions(+), 44 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 6efbe72..d6b4c3c 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -151,18 +151,45 @@ async def send_telegram_message(message): async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: global TOKENS_INFO - prices = await get_prices_from_coingecko(token_addresses) + # Skip for USD + prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} + remaining_tokens = [addr for addr in token_addresses if addr not in prices] + + # Try CoinGecko + coingecko_prices = await get_prices_from_coingecko(remaining_tokens) + prices.update(coingecko_prices) + + + # For remaining missing tokens, try Jupiter + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) + prices.update(jupiter_prices) + + # For tokens not found in CoinGecko, use DexScreener - missing_tokens = set(token_addresses) - set(prices.keys()) + missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) if missing_tokens: dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) prices.update(dexscreener_prices) + + # For remaining missing tokens, try Raydium + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + raydium_prices = await get_prices_from_raydium(list(missing_tokens)) + prices.update(raydium_prices) + + # For remaining missing tokens, try Orca + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + orca_prices = await get_prices_from_orca(list(missing_tokens)) + prices.update(orca_prices) + # If any tokens are still missing, set their prices to 0 for token in set(token_addresses) - set(prices.keys()): prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") - for token, price in prices.items(): token_info = TOKENS_INFO.setdefault(token, {}) if 'symbol' not in token_info: @@ -171,43 +198,118 @@ async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: return prices + async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.coingecko.com/api/v3/simple/token_price/solana" - params = { - "contract_addresses": ",".join(token_addresses), - "vs_currencies": DISPLAY_CURRENCY.lower() - } + base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" prices = {} - + + async def fetch_single_price(session, address): + params = { + "contract_addresses": address, + "vs_currencies": DISPLAY_CURRENCY.lower() + } + try: + async with session.get(base_url, params=params) as response: + if response.status == 200: + data = await response.json() + if address in data and DISPLAY_CURRENCY.lower() in data[address]: + return address, data[address][DISPLAY_CURRENCY.lower()] + else: + logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") + return address, None + async with aiohttp.ClientSession() as session: - async with session.get(url, params=params) as response: - if response.status == 200: - data = await response.json() - for address, price_info in data.items(): - if DISPLAY_CURRENCY.lower() in price_info: - prices[address] = price_info[DISPLAY_CURRENCY.lower()] - else: - logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}") - + tasks = [fetch_single_price(session, address) for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, price in results: + if price is not None: + prices[address] = price + return prices async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} - async with aiohttp.ClientSession() as session: - tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, result in zip(token_addresses, results): - if result and 'pairs' in result and result['pairs']: - pair = result['pairs'][0] # Use the first pair (usually the most liquid) - prices[address] = float(pair['priceUsd']) - else: - logging.warning(f"No price data found on DexScreener for token {address}") - + try: + async with aiohttp.ClientSession() as session: + tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, result in zip(token_addresses, results): + if result and 'pairs' in result and result['pairs']: + pair = result['pairs'][0] # Use the first pair (usually the most liquid) + prices[address] = float(pair['priceUsd']) + else: + logging.warning(f"No price data found on DexScreener for token {address}") + except Exception as e: + logging.error(f"Error fetching token prices from DexScreener: {str(e)}") + return prices +async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: + url = "https://price.jup.ag/v4/price" + params = { + "ids": ",".join(token_addresses) + } + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + for address, price_info in data.get('data', {}).items(): + if 'price' in price_info: + prices[address] = float(price_info['price']) + else: + logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Jupiter: {str(e)}") + return prices + +# New function for Raydium +async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.raydium.io/v2/main/price" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for address in token_addresses: + if address in data: + prices[address] = float(data[address]) + else: + logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Raydium: {str(e)}") + return prices + +# New function for Orca +async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.orca.so/allTokens" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for token_info in data: + if token_info['mint'] in token_addresses: + prices[token_info['mint']] = float(token_info['price']) + else: + logging.error(f"Failed to get token prices from Orca. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Orca: {str(e)}") + return prices + + async def fetch_token_data(session, url): try: async with session.get(url) as response: @@ -836,7 +938,17 @@ async def save_log(log): PROCESSING_LOG = False async def process_log(log_result): global PROCESSING_LOG - + tr_details = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + if log_result['value']['err']: return @@ -854,16 +966,6 @@ async def process_log(log_result): before_source_balance = 0 source_token_change = 0 - tr_details = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } i = 0 while i < len(logs): log_entry = logs[i] @@ -1023,7 +1125,11 @@ async def follow_move(move): # Calculate the amount to swap based on the same percentage as the followed move amount_to_swap = your_balance * (move['percentage_swapped'] / 100) - amount_to_swap = min( min(amount_to_swap, your_balance), 300) + amount_to_swap = min(amount_to_swap, your_balance) # should not happen + + if token_name_in == 'USDC': # max 300 + amount_to_swap = min(amount_to_swap, 300) + # # always get 99% of the amount to swap # amount_to_swap = amount_to_swap * 0.95 @@ -1033,7 +1139,7 @@ async def follow_move(move): amount = int(amount_to_swap * 10**decimals) - if your_balance < amount_to_swap: + if your_balance < amount_to_swap: # should not happen msg = ( f"Warning:\n" f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." @@ -1178,8 +1284,10 @@ async def wallet_watch_loop(): first_subscription = False process_task = asyncio.create_task(process_messages(websocket, subscription_id)) while True: - try: - await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) + try:# drop subscription now + await process_messages(websocket, subscription_id) + # await asyncio.run(process_task) + # await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) except asyncio.TimeoutError: # Timeout occurred, time to resubscribe if not PROCESSING_LOG: