diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 6efbe72..3ac8efe 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -119,7 +119,12 @@ dexscreener_client = DexscreenerClient() # Initialize Telegram Bot -bot = Bot(token=TELEGRAM_BOT_TOKEN) +# Create a custom connection pool +conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit +timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout + +# Create the bot with the custom connection pool +bot = Bot(TELEGRAM_BOT_TOKEN, request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { @@ -151,18 +156,45 @@ async def send_telegram_message(message): async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: global TOKENS_INFO - prices = await get_prices_from_coingecko(token_addresses) + # Skip for USD + prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} + remaining_tokens = [addr for addr in token_addresses if addr not in prices] + + # Try CoinGecko + coingecko_prices = await get_prices_from_coingecko(remaining_tokens) + prices.update(coingecko_prices) + + + # For remaining missing tokens, try Jupiter + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) + prices.update(jupiter_prices) + + # For tokens not found in CoinGecko, use DexScreener - missing_tokens = set(token_addresses) - set(prices.keys()) + missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) if missing_tokens: dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) prices.update(dexscreener_prices) + + # For remaining missing tokens, try Raydium + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + raydium_prices = await get_prices_from_raydium(list(missing_tokens)) + prices.update(raydium_prices) + + # For remaining missing tokens, try Orca + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + orca_prices = await get_prices_from_orca(list(missing_tokens)) + prices.update(orca_prices) + # If any tokens are still missing, set their prices to 0 for token in set(token_addresses) - set(prices.keys()): prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") - for token, price in prices.items(): token_info = TOKENS_INFO.setdefault(token, {}) if 'symbol' not in token_info: @@ -171,43 +203,118 @@ async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: return prices + async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.coingecko.com/api/v3/simple/token_price/solana" - params = { - "contract_addresses": ",".join(token_addresses), - "vs_currencies": DISPLAY_CURRENCY.lower() - } + base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" prices = {} - + + async def fetch_single_price(session, address): + params = { + "contract_addresses": address, + "vs_currencies": DISPLAY_CURRENCY.lower() + } + try: + async with session.get(base_url, params=params) as response: + if response.status == 200: + data = await response.json() + if address in data and DISPLAY_CURRENCY.lower() in data[address]: + return address, data[address][DISPLAY_CURRENCY.lower()] + else: + logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") + return address, None + async with aiohttp.ClientSession() as session: - async with session.get(url, params=params) as response: - if response.status == 200: - data = await response.json() - for address, price_info in data.items(): - if DISPLAY_CURRENCY.lower() in price_info: - prices[address] = price_info[DISPLAY_CURRENCY.lower()] - else: - logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}") - + tasks = [fetch_single_price(session, address) for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, price in results: + if price is not None: + prices[address] = price + return prices async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} - async with aiohttp.ClientSession() as session: - tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, result in zip(token_addresses, results): - if result and 'pairs' in result and result['pairs']: - pair = result['pairs'][0] # Use the first pair (usually the most liquid) - prices[address] = float(pair['priceUsd']) - else: - logging.warning(f"No price data found on DexScreener for token {address}") - + try: + async with aiohttp.ClientSession() as session: + tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, result in zip(token_addresses, results): + if result and 'pairs' in result and result['pairs']: + pair = result['pairs'][0] # Use the first pair (usually the most liquid) + prices[address] = float(pair['priceUsd']) + else: + logging.warning(f"No price data found on DexScreener for token {address}") + except Exception as e: + logging.error(f"Error fetching token prices from DexScreener: {str(e)}") + return prices +async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: + url = "https://price.jup.ag/v4/price" + params = { + "ids": ",".join(token_addresses) + } + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + for address, price_info in data.get('data', {}).items(): + if 'price' in price_info: + prices[address] = float(price_info['price']) + else: + logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Jupiter: {str(e)}") + return prices + +# New function for Raydium +async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.raydium.io/v2/main/price" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for address in token_addresses: + if address in data: + prices[address] = float(data[address]) + else: + logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Raydium: {str(e)}") + return prices + +# New function for Orca +async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.orca.so/allTokens" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for token_info in data: + if token_info['mint'] in token_addresses: + prices[token_info['mint']] = float(token_info['price']) + else: + logging.error(f"Failed to get token prices from Orca. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Orca: {str(e)}") + return prices + + async def fetch_token_data(session, url): try: async with session.get(url) as response: @@ -836,7 +943,17 @@ async def save_log(log): PROCESSING_LOG = False async def process_log(log_result): global PROCESSING_LOG - + tr_details = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + if log_result['value']['err']: return @@ -854,16 +971,6 @@ async def process_log(log_result): before_source_balance = 0 source_token_change = 0 - tr_details = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } i = 0 while i < len(logs): log_entry = logs[i] @@ -1023,7 +1130,11 @@ async def follow_move(move): # Calculate the amount to swap based on the same percentage as the followed move amount_to_swap = your_balance * (move['percentage_swapped'] / 100) - amount_to_swap = min( min(amount_to_swap, your_balance), 300) + amount_to_swap = min(amount_to_swap, your_balance) # should not happen + + if token_name_in == 'USDC': # max 300 + amount_to_swap = min(amount_to_swap, 300) + # # always get 99% of the amount to swap # amount_to_swap = amount_to_swap * 0.95 @@ -1033,7 +1144,7 @@ async def follow_move(move): amount = int(amount_to_swap * 10**decimals) - if your_balance < amount_to_swap: + if your_balance < amount_to_swap: # should not happen msg = ( f"Warning:\n" f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." @@ -1151,9 +1262,10 @@ SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes # except websockets.exceptions.ConnectionClosed: # break -first_subscription = True +_first_subscription = True +_process_task = None async def wallet_watch_loop(): - global first_subscription + global first_subscription, process_task reconnect_delay = 5 max_reconnect_delay = 60 @@ -1173,19 +1285,21 @@ async def wallet_watch_loop(): subscription_id = await subscribe(websocket) if subscription_id is not None: await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") - if first_subscription: + if _first_subscription: asyncio.create_task( list_initial_wallet_states()) - first_subscription = False - process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + _first_subscription = False + _process_task = asyncio.create_task(process_messages(websocket, subscription_id)) while True: - try: - await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) + try:# drop subscription now + await process_messages(websocket, subscription_id) + # await asyncio.run(_process_task) + # await asyncio.wait_for(_process_task, timeout=SUBSCRIBE_INTERVAL) except asyncio.TimeoutError: # Timeout occurred, time to resubscribe if not PROCESSING_LOG: - process_task.cancel() + _process_task.cancel() try: - await process_task + await _process_task except asyncio.CancelledError: pass await unsubscribe(websocket, subscription_id)