import sys import os import aiohttp sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from spl.token.client import Token from base64 import b64decode import base58 from solders.rpc.requests import GetTransaction from solders.signature import Signature from solders.pubkey import Pubkey from solders.keypair import Keypair from solders.transaction import VersionedTransaction from solders.transaction import Transaction from solders.message import Message from solders.instruction import Instruction from solders.hash import Hash from solders.instruction import CompiledInstruction import asyncio import json import logging import random import websockets from typing import Dict, List, Optional import requests from datetime import datetime from solana.rpc.types import TokenAccountOpts, TxOpts from typing import List, Dict, Any, Tuple import traceback # # # solders/solana libs (solana_client) # # # from spl.token._layouts import MINT_LAYOUT from solana.rpc.api import Client, Pubkey from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from borsh_construct import String, CStruct # ------------------ logger = logging.getLogger(__name__) PING_INTERVAL = 30 SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET) from modules.utils import telegram_utils, async_safe_call # Use the production Solana RPC endpoint solana_client = AsyncClient(SOLANA_HTTP_URL) dexscreener_client = DexscreenerClient() class SolanaWS: def __init__(self, on_message: Optional[callable] = None): self.websocket = None self.subscription_id = None self.message_queue = asyncio.Queue() self.on_message = on_message self.websocket = None self.last_msg_responded = False async def connect(self): while True: try: current_url = random.choice(SOLANA_ENDPOINTS) self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=10) logger.info(f"Connected to Solana websocket: {current_url}") return except Exception as e: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) async def ws_jsonrpc(self, method, params=None, doProcessResponse = True): if not isinstance(params, list): params = [params] if params is not None else [] request = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } self.last_msg_responded = False await self.websocket.send(json.dumps(request)) if not doProcessResponse: return None else: response = await self.websocket.recv() response_data = json.loads(response) self.last_msg_responded = True if 'result' in response_data: return response_data['result'] elif 'error' in response_data: logger.error(f"Error in WebSocket RPC call: {response_data['error']}") return None else: logger.warning(f"Unexpected response: {response_data}") return None async def subscribe(self): params = [ {"mentions": [FOLLOWED_WALLET]}, {"commitment": "confirmed"} ] # define onmessage as inline callback to get subscription_id which waits for last_msg_responded # self.on_message = lambda message: self.subscription_id = message.get('result') await self.ws_jsonrpc("logsSubscribe", params, False) await asyncio.sleep(.4) await self.receive_messages(True) result = await self.process_messages(True) if result is not None and result > 0: self.subscription_id = result logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") elif result: logger.error("already subscribed") else: logger.error("Failed to subscribe") async def unsubscribe(self): if self.subscription_id: result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id]) if result: logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") self.subscription_id = None else: logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") async def receive_messages(self, one = False): while True: try: response = await self.websocket.recv() response_data = json.loads(response) self.last_msg_responded = True if 'result' in response_data: await self.message_queue.put(response_data['result']) if one: break except websockets.exceptions.ConnectionClosedError: logger.error("WebSocket connection closed") break except Exception as e: logger.error(f"Error receiving message: {e}") break async def process_messages(self, one = False): while True: message = await self.message_queue.get() if type(message) == int: subscription_id = message logger.info(f"Subscription id: {subscription_id}") if self.on_message: #message = json.loads(message) await self.on_message(message) logger.info(f"Received message: {message}") if one: return message async def close(self): if self.websocket: await self.websocket.close() logger.info("WebSocket connection closed") async def solana_jsonrpc(method, params=None, jsonParsed=True): if not isinstance(params, list): params = [params] if params is not None else [] data = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } if jsonParsed: data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}) else: data["params"].append({"maxSupportedTransactionVersion": 0}) try: response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) response.raise_for_status() result = response.json() if 'result' not in result or 'error' in result: logger.error("Error fetching data from Solana RPC:", result) return None return result['result'] except Exception as e: logger.error(f"Error fetching data from Solana RPC: {e}") return None class SolanaAPI: def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None): self.process_transaction = process_transaction_callback self.on_initial_subscription = on_initial_subscription_callback # if callable(on_initial_subscription_callback) else lambda: None # Define a default lambda function for on_bot_message default_on_bot_message = lambda message: logger.info(f"Bot message: {message}") # Use the provided on_bot_message if it's callable, otherwise use the default self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message self.dex = DEX self.solana_ws = SolanaWS(on_message=self.process_transaction) async def process_messages(self, solana_ws): while True: message = await solana_ws.message_queue.get() await self.process_transaction(message) _first_subscription = True async def wallet_watch_loop(self): solana_ws = SolanaWS(on_message=self.process_transaction) first_subscription = True while True: try: await solana_ws.connect() await solana_ws.subscribe() if first_subscription: await async_safe_call( self.on_initial_subscription, solana_ws.subscription_id) first_subscription = False await async_safe_call(self.on_bot_message,f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) try: await asyncio.gather(receive_task, process_task) except asyncio.CancelledError: pass finally: receive_task.cancel() process_task.cancel() except Exception as e: logger.error(f"An unexpected error occurred: {e}") logger.error("".join(traceback.format_exception(None, e, e.__traceback__))) finally: try: await solana_ws.unsubscribe() if solana_ws.websocket: await solana_ws.close() await async_safe_call(self.on_bot_message,"Reconnecting...") receive_task.cancel() process_task.cancel() except Exception as e: logger.error(f"An error occurred while unsubscribing: {e}") await asyncio.sleep(5) async def get_last_transactions(self, account_address, check_interval=300, limit=1000): last_check_time = None last_signature = None while True: current_time = datetime.now() if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: params = [ account_address, { "limit": limit } ] if last_signature: params[1]["before"] = last_signature result = await self.solana_ws.solana_jsonrpc("getSignaturesForAddress", params) if result: for signature in result: if last_signature and signature['signature'] == last_signature: break await self.process_transaction(signature) if result: last_signature = result[0]['signature'] last_check_time = current_time await asyncio.sleep(1) async def get_token_metadata_symbol(mint_address): global TOKENS_INFO if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: return TOKENS_INFO[mint_address].get('symbol') try: account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_data = account_data_result['value']['data'] if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: account_data_info = account_data_data['parsed']['info'] if 'decimals' in account_data_info: if mint_address in TOKENS_INFO: TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] else: TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} if 'tokenName' in account_data_info: if mint_address in TOKENS_INFO: TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] else: TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} metadata = await get_token_metadata(mint_address) if metadata: if mint_address in TOKENS_INFO: TOKENS_INFO[mint_address].update(metadata) else: TOKENS_INFO[mint_address] = metadata await save_token_info() # TOKENS_INFO[mint_address] = metadata # return metadata.get('symbol') or metadata.get('name') return TOKENS_INFO[mint_address].get('symbol') except Exception as e: logging.error(f"Error fetching token name for {mint_address}: {str(e)}") return None async def get_transaction_details_rpc(tx_signature, readfromDump=False): global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO try: if readfromDump and os.path.exists('./logs/transation_details.json'): with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details transaction_details = json.load(f) return transaction_details else: transaction_details = await self.solana_ws.solana_jsonrpc("getTransaction", tx_signature) with open('./logs/transation_details.json', 'w') as f: json.dump(transaction_details, f, indent=2) if transaction_details is None: logging.error(f"Error fetching transaction details for {tx_signature}") return None # Initialize default result structure parsed_result = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } # Extract order_id from logs log_messages = transaction_details.get("meta", {}).get("logMessages", []) for log in log_messages: if "order_id" in log: parsed_result["order_id"] = log.split(":")[2].strip() break # Extract token transfers from innerInstructions inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) for instruction_set in inner_instructions: for instruction in instruction_set.get('instructions', []): if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': info = instruction['parsed']['info'] mint = info['mint'] amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals # Determine which token is being swapped in and out based on zero balances if parsed_result["token_in"] is None and amount > 0: parsed_result["token_in"] = mint parsed_result["amount_in"] = amount if parsed_result["token_in"] is None or parsed_result["token_out"] is None: # if we've failed to extract token_in and token_out from the transaction details, try a second method inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) transfers = [] for instruction_set in inner_instructions: for instruction in instruction_set.get('instructions', []): if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: info = instruction['parsed']['info'] amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 adjusted_amount = amount / (10 ** decimals) # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) transfers.append({ 'mint': info.get('mint'), 'amount': adjusted_amount, 'source': info['source'], 'destination': info['destination'] }) # Identify token_in and token_out if len(transfers) >= 2: parsed_result["token_in"] = transfers[0]['mint'] parsed_result["amount_in"] = transfers[0]['amount'] parsed_result["token_out"] = transfers[-1]['mint'] parsed_result["amount_out"] = transfers[-1]['amount'] # If mint is not provided, query the Solana network for the account data if parsed_result["token_in"] is None or parsed_result["token_out"] is None: #for transfer in transfers: # do only first and last transfer for transfer in [transfers[0], transfers[-1]]: if transfer['mint'] is None: # Query the Solana network for the account data account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", transfer['source']) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_value = account_data_result['value'] account_data_data = account_data_value['data'] if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: account_data_info = account_data_data['parsed']['info'] if 'mint' in account_data_info: transfer['mint'] = account_data_info['mint'] if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: await get_token_metadata_symbol(transfer['mint']) # get actual prices current_price = await get_token_prices([transfer['mint']]) if parsed_result["token_in"] is None: parsed_result["token_in"] = transfer['mint'] parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol'] parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) elif parsed_result["token_out"] is None: parsed_result["token_out"] = transfer['mint'] parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol'] parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price'] pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) for balance in pre_balalnces: if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] break # Calculate percentage swapped try: if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 else: # calculate based on total wallet value: FOLLOWED_WALLET_VALUE parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100 except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return parsed_result except requests.exceptions.RequestException as e: print("Error fetching transaction details:", e) # "Program log: Instruction: Swap2", # "Program log: order_id: 13985890735038016", # "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source # "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: global TOKENS_INFO tr_info = await self.get_transaction_details_with_retry(tx_signature_str) try: tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50 except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return tr_info # def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: # pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) # for balance in pre_balances: # if balance['mint'] == token: # return float(balance['uiTokenAmount']['amount']) # return 0.0 async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, max_retries = 16): # wait for the transaction to be confirmed # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) # query every 5 seconds for the transaction details until not None or 30 seconds for _ in range(max_retries): try: tx_details = await self.get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: logging.error(f"Error fetching transaction details: {e}") retry_delay = retry_delay * 1.2 logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}. retry in {retry_delay} s.") await asyncio.sleep(retry_delay) retry_delay *= 1.2 return tx_details async def get_swap_transaction_details(tx_signature_str): t = await self.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) try: parsed_result = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } instructions = t.value.transaction.transaction.message.instructions # Parse the swap instruction to extract token addresses, amounts, and types for instruction in instructions: if isinstance(instruction, CompiledInstruction): if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"): parsed_info = instruction.parsed.info mint = parsed_info["mint"] amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"]) # Determine token in and token out based on balances if parsed_result["token_in"] is None and amount > 0: parsed_result["token_in"] = mint parsed_result["amount_in"] = amount elif parsed_result["token_out"] is None: parsed_result["token_out"] = mint parsed_result["amount_out"] = amount # Calculate percentage swapped if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 return parsed_result except Exception as e: logging.error(f"Error fetching transaction details: {e}") return None async def get_token_balance_rpc(wallet_address, token_address): try: accounts = await self.solana_ws.solana_jsonrpc("getTokenAccountsByOwner", [ wallet_address, { "mint": token_address }]) if accounts['value']: first_account = accounts['value'][0]['pubkey'] balance_data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountBalance", "params": [ first_account ] } balance = self.solana_ws.solana_jsonrpc("getTokenAccountBalance", first_account) if 'value' in balance: amount = float(balance['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No balance found for {token_address} in {wallet_address}") return 0 else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except requests.exceptions.RequestException as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 class SolanaDEX: def __init__(self, DISPLAY_CURRENCY: str): self.DISPLAY_CURRENCY = DISPLAY_CURRENCY self.solana_client = AsyncClient("https://api.mainnet-beta.solana.com") self.TOKENS_INFO = {} self.TOKEN_PRICES = {} self.TOKEN_ADDRESSES = {} self.FOLLOWED_WALLET_VALUE = 0 self.YOUR_WALLET_VALUE = 0 try: with open('../logs/token_info.json', 'r') as f: self.TOKENS_INFO = json.load(f) except Exception as e: logging.error(f"Error loading token info: {str(e)}") async def get_token_prices(self, token_addresses: List[str]) -> Dict[str, float]: prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} remaining_tokens = [addr for addr in token_addresses if addr not in prices] coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens) prices.update(coingecko_prices) missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: jupiter_prices = await self.get_prices_from_jupiter(list(missing_tokens)) prices.update(jupiter_prices) missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) if missing_tokens: dexscreener_prices = await self.get_prices_from_dexscreener(list(missing_tokens)) prices.update(dexscreener_prices) missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: raydium_prices = await self.get_prices_from_raydium(list(missing_tokens)) prices.update(raydium_prices) missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: orca_prices = await self.get_prices_from_orca(list(missing_tokens)) prices.update(orca_prices) for token in set(token_addresses) - set(prices.keys()): prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") for token, price in prices.items(): token_info = self.TOKENS_INFO.setdefault(token, {}) if 'symbol' not in token_info: token_info['symbol'] = await self.get_token_metadata_symbol(token) token_info['price'] = price return prices async def get_prices_from_coingecko(self, token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" prices = {} async def fetch_single_price(session, address): params = { "contract_addresses": address, "vs_currencies": self.DISPLAY_CURRENCY.lower() } try: async with session.get(base_url, params=params) as response: if response.status == 200: data = await response.json() if address in data and self.DISPLAY_CURRENCY.lower() in data[address]: return address, data[address][self.DISPLAY_CURRENCY.lower()] else: logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") except Exception as e: logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") return address, None async with aiohttp.ClientSession() as session: tasks = [fetch_single_price(session, address) for address in token_addresses] results = await asyncio.gather(*tasks) for address, price in results: if price is not None: prices[address] = price return prices async def get_prices_from_dexscreener(self, token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} try: async with aiohttp.ClientSession() as session: tasks = [self.fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] results = await asyncio.gather(*tasks) for address, result in zip(token_addresses, results): if result and 'pairs' in result and result['pairs']: pair = result['pairs'][0] # Use the first pair (usually the most liquid) prices[address] = float(pair['priceUsd']) else: logging.warning(f"No price data found on DexScreener for token {address}") except Exception as e: logging.error(f"Error fetching token prices from DexScreener: {str(e)}") return prices async def get_prices_from_jupiter(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://price.jup.ag/v4/price" params = { "ids": ",".join(token_addresses) } prices = {} try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() for address, price_info in data.get('data', {}).items(): if 'price' in price_info: prices[address] = float(price_info['price']) else: logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") except Exception as e: logging.error(f"Error fetching token prices from Jupiter: {str(e)}") return prices async def get_prices_from_raydium(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://api.raydium.io/v2/main/price" prices = {} try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() for address in token_addresses: if address in data: prices[address] = float(data[address]) else: logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") except Exception as e: logging.error(f"Error fetching token prices from Raydium: {str(e)}") return prices async def get_prices_from_orca(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://api.orca.so/allTokens" prices = {} try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() for token_info in data: if token_info['mint'] in token_addresses: prices[token_info['mint']] = float(token_info['price']) else: logging.error(f"Failed to get token prices from Orca. Status: {response.status}") except Exception as e: logging.error(f"Error fetching token prices from Orca: {str(e)}") return prices @staticmethod async def fetch_token_data(session, url): try: async with session.get(url) as response: if response.status == 200: return await response.json() else: logging.error(f"Failed to fetch data from {url}. Status: {response.status}") return None except Exception as e: logging.error(f"Error fetching data from {url}: {str(e)}") return None async def get_sol_price(self) -> float: sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address prices = await self.get_token_prices([sol_address]) return prices.get(sol_address, 0.0) async def get_wallet_balances(self, wallet_address, doGetTokenName=True): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") try: response = await self.solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") ), commitment=Confirmed ) if response.value: for account in response.value: try: parsed_data = account.account.data.parsed if isinstance(parsed_data, dict) and 'info' in parsed_data: info = parsed_data['info'] if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: mint = info['mint'] decimals = int(info['tokenAmount']['decimals']) amount = int(info['tokenAmount']['amount']) amount = float(amount /10**decimals) if amount > 1: if mint in self.TOKENS_INFO: token_name = self.TOKENS_INFO[mint].get('symbol') elif doGetTokenName: token_name = await self.get_token_metadata_symbol(mint) or 'N/A' self.TOKENS_INFO[mint] = {'symbol': token_name} await asyncio.sleep(2) self.TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) self.TOKENS_INFO[mint]['decimals'] = decimals balances[mint] = { 'name': token_name or 'N/A', 'address': mint, 'amount': amount, 'decimals': decimals } try: logging.debug(f"Account balance for {token_name} ({mint}): {amount}") except Exception as e: logging.error(f"Error logging account balance: {str(e)}") else: logging.warning(f"Unexpected data format for account: {account}") except Exception as e: logging.error(f"Error parsing account data: {str(e)}") sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: balances['SOL'] = { 'name': 'SOL', 'address': 'SOL', 'amount': sol_balance.value / 1e9 } else: logging.warning(f"SOL balance response missing for wallet: {wallet_address}") except Exception as e: logging.error(f"Error getting wallet balances: {str(e)}") logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") return balances async def convert_balances_to_currency(self, balances, sol_price): converted_balances = {} for address, info in balances.items(): converted_balance = info.copy() if info['name'] == 'SOL': converted_balance['value'] = info['amount'] * sol_price elif address in self.TOKEN_PRICES: converted_balance['value'] = info['amount'] * self.TOKEN_PRICES[address] else: converted_balance['value'] = None logging.warning(f"Price not available for token {info['name']} ({address})") converted_balances[address] = converted_balance return converted_balances async def list_initial_wallet_states(self, FOLLOWED_WALLET, YOUR_WALLET): followed_wallet_balances = await self.get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances = await self.get_wallet_balances(YOUR_WALLET) all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()) | set(self.TOKEN_ADDRESSES.values())) self.TOKEN_PRICES = await self.get_token_prices(all_token_addresses) sol_price = await self.get_sol_price() followed_converted_balances = await self.convert_balances_to_currency(followed_wallet_balances, sol_price) your_converted_balances = await self.convert_balances_to_currency(your_wallet_balances, sol_price) self.TOKEN_ADDRESSES = { address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 } logging.info(f"Monitoring balances for tokens: {[info['name'] for info in self.TOKEN_ADDRESSES.values()]}") followed_wallet_state = [] self.FOLLOWED_WALLET_VALUE = 0 for address, info in followed_converted_balances.items(): if info['value'] is not None and info['value'] > 0: followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY} ({info['address']})") self.FOLLOWED_WALLET_VALUE += info['value'] your_wallet_state = [] self.YOUR_WALLET_VALUE = 0 for address, info in your_converted_balances.items(): if info['value'] is not None and info['value'] > 0: your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY}") self.YOUR_WALLET_VALUE += info['value'] message = ( f"Initial Wallet States (All balances in {self.DISPLAY_CURRENCY}):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{chr(10).join(followed_wallet_state)}\n" f"Total Value: {self.FOLLOWED_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{chr(10).join(your_wallet_state)}\n" f"Total Value: {self.YOUR_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n" f"Monitored Tokens:\n" f"{', '.join([self.safe_get_property(info, 'name') for info in self.TOKEN_ADDRESSES.values()])}" ) logging.info(message) # await telegram_utils.send_telegram_message(message) # save token info to file await self.save_token_info() @staticmethod def safe_get_property(obj, prop): return obj.get(prop, 'N/A') async def get_token_metadata_symbol(self, token_address): # Implement this method to fetch token metadata symbol pass async def save_token_info(self): # Implement this method to save token info to a file pass async def save_token_info(): with open('./logs/token_info.json', 'w') as f: json.dump(TOKENS_INFO, f, indent=2) DEX = SolanaDEX(DISPLAY_CURRENCY) SAPI = SolanaAPI( on_initial_subscription_callback=DEX.list_initial_wallet_states(FOLLOWED_WALLET,YOUR_WALLET))