From d63d3d41bc4d34fafadf5beb214a74c06e74f389 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 15 Oct 2024 23:19:30 +0300 Subject: [PATCH] big refactoring --- crypto/sol/app.py | 609 +++-------------------------- crypto/sol/modules/SolanaAPI.py | 656 +++++++++++++++++++++++++++----- crypto/sol/modules/utils.py | 2 +- 3 files changed, 612 insertions(+), 655 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 6c68b93..56821b1 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -5,7 +5,6 @@ from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect -from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from spl.token.client import Token @@ -24,6 +23,7 @@ from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient +from solana.rpc.types import TokenAccountOpts, TxOpts import datetime import logging @@ -55,38 +55,9 @@ from config import ( error_logger ) -from modules.utils import (get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) -from modules.SolanaAPI import SolanaAPI, solana_jsonrpc, wallet_watch_loop -from modules.utils import telegram_utils, send_telegram_message - -# # config = load_config() -# load_dotenv() -# load_dotenv('.env.secret') -# # Configuration -# DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") -# FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") -# YOUR_WALLET = os.getenv("YOUR_WALLET") -# TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") -# SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") -# SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") -# DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') -# BOT_NAME = os.getenv("BOT_NAME") - -# logger = logging.getLogger(__name__) -# logging.basicConfig(level=logging.DEBUG) -# #logging.basicConfig(level=logging.INFO) - -# # Set up error logger -# log_dir = './logs' -# log_file = os.path.join(log_dir, 'error.log') -# os.makedirs(log_dir, exist_ok=True) -# error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) -# error_file_handler.setLevel(logging.ERROR) -# error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) -# error_logger = logging.getLogger('error_logger') -# error_logger.setLevel(logging.ERROR) -# error_logger.addHandler(error_file_handler) +from modules.SolanaAPI import SolanaAPI, SolanaDEX +from modules.utils import telegram_utils # Function to find the latest log file @@ -145,7 +116,7 @@ if not telegram_utils.bot: asyncio.run(telegram_utils.initialize()) except Exception as e: logging.error(f"Error initializing Telegram bot: {str(e)}") -# async def send_telegram_message(message): +# async def telegram_utils.send_telegram_message(message): # try: # await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) # logging.info(f"Telegram message sent: {message}") @@ -161,196 +132,6 @@ if not telegram_utils.bot: # # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # -async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: - global TOKENS_INFO - - # Skip for USD - prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} - remaining_tokens = [addr for addr in token_addresses if addr not in prices] - - # Try CoinGecko - coingecko_prices = await get_prices_from_coingecko(remaining_tokens) - prices.update(coingecko_prices) - - - # For remaining missing tokens, try Jupiter - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) - prices.update(jupiter_prices) - - - # For tokens not found in CoinGecko, use DexScreener - missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) - if missing_tokens: - dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) - prices.update(dexscreener_prices) - - # For remaining missing tokens, try Raydium - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - raydium_prices = await get_prices_from_raydium(list(missing_tokens)) - prices.update(raydium_prices) - - # For remaining missing tokens, try Orca - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - orca_prices = await get_prices_from_orca(list(missing_tokens)) - prices.update(orca_prices) - - # If any tokens are still missing, set their prices to 0 - for token in set(token_addresses) - set(prices.keys()): - prices[token] = 0.0 - logging.warning(f"Price not found for token {token}. Setting to 0.") - - for token, price in prices.items(): - token_info = TOKENS_INFO.setdefault(token, {}) - if 'symbol' not in token_info: - token_info['symbol'] = await get_token_metadata_symbol(token) - token_info['price'] = price - - return prices - - -async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: - base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" - prices = {} - - async def fetch_single_price(session, address): - params = { - "contract_addresses": address, - "vs_currencies": DISPLAY_CURRENCY.lower() - } - try: - async with session.get(base_url, params=params) as response: - if response.status == 200: - data = await response.json() - if address in data and DISPLAY_CURRENCY.lower() in data[address]: - return address, data[address][DISPLAY_CURRENCY.lower()] - else: - logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") - return address, None - - async with aiohttp.ClientSession() as session: - tasks = [fetch_single_price(session, address) for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, price in results: - if price is not None: - prices[address] = price - - return prices - -async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: - base_url = "https://api.dexscreener.com/latest/dex/tokens/" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, result in zip(token_addresses, results): - if result and 'pairs' in result and result['pairs']: - pair = result['pairs'][0] # Use the first pair (usually the most liquid) - prices[address] = float(pair['priceUsd']) - else: - logging.warning(f"No price data found on DexScreener for token {address}") - except Exception as e: - logging.error(f"Error fetching token prices from DexScreener: {str(e)}") - - return prices - -async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: - url = "https://price.jup.ag/v4/price" - params = { - "ids": ",".join(token_addresses) - } - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, params=params) as response: - if response.status == 200: - data = await response.json() - for address, price_info in data.get('data', {}).items(): - if 'price' in price_info: - prices[address] = float(price_info['price']) - else: - logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Jupiter: {str(e)}") - return prices - -# New function for Raydium -async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.raydium.io/v2/main/price" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - for address in token_addresses: - if address in data: - prices[address] = float(data[address]) - else: - logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Raydium: {str(e)}") - return prices - -# New function for Orca -async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.orca.so/allTokens" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - for token_info in data: - if token_info['mint'] in token_addresses: - prices[token_info['mint']] = float(token_info['price']) - else: - logging.error(f"Failed to get token prices from Orca. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Orca: {str(e)}") - return prices - - -async def fetch_token_data(session, url): - try: - async with session.get(url) as response: - if response.status == 200: - return await response.json() - else: - logging.error(f"Failed to fetch data from {url}. Status: {response.status}") - return None - except Exception as e: - logging.error(f"Error fetching data from {url}: {str(e)}") - return None - -async def get_sol_price() -> float: - url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - return data['solana'][DISPLAY_CURRENCY.lower()] - else: - logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}") - return await get_sol_price_from_dexscreener() - -async def get_sol_price_from_dexscreener() -> float: - sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address - prices = await get_prices_from_dexscreener([sol_address]) - return prices.get(sol_address, 0.0) - # # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # @@ -415,43 +196,6 @@ from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from borsh_construct import String, CStruct -async def get_token_metadata_symbol(mint_address): - global TOKENS_INFO - - if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: - return TOKENS_INFO[mint_address].get('symbol') - - try: - account_data_result = await solana_jsonrpc("getAccountInfo", mint_address) - if 'value' in account_data_result and 'data' in account_data_result['value']: - account_data_data = account_data_result['value']['data'] - if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: - account_data_info = account_data_data['parsed']['info'] - if 'decimals' in account_data_info: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] - else: - TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} - if 'tokenName' in account_data_info: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] - else: - TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} - - metadata = await get_token_metadata(mint_address) - if metadata: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address].update(metadata) - else: - TOKENS_INFO[mint_address] = metadata - await save_token_info() - # TOKENS_INFO[mint_address] = metadata - # return metadata.get('symbol') or metadata.get('name') - return TOKENS_INFO[mint_address].get('symbol') - except Exception as e: - logging.error(f"Error fetching token name for {mint_address}: {str(e)}") - return None - @@ -557,80 +301,6 @@ async def get_token_metadata(mint_address): return None -async def get_wallet_balances(wallet_address, doGetTokenName=True): - balances = {} - logging.info(f"Getting balances for wallet: {wallet_address}") - global TOKENS_INFO - try: - response = await solana_client.get_token_accounts_by_owner_json_parsed( - Pubkey.from_string(wallet_address), - opts=TokenAccountOpts( - program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") - ), - commitment=Confirmed - ) - - if response.value: - for account in response.value: - try: - parsed_data = account.account.data.parsed - if isinstance(parsed_data, dict) and 'info' in parsed_data: - info = parsed_data['info'] - if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: - mint = info['mint'] - decimals = info['tokenAmount']['decimals'] - amount = float(info['tokenAmount']['amount'])/10**decimals - if amount > 0: - if mint in TOKENS_INFO: - token_name = TOKENS_INFO[mint].get('symbol') - elif doGetTokenName: - token_name = await get_token_metadata_symbol(mint) or 'N/A' - # sleep for 1 second to avoid rate limiting - await asyncio.sleep(2) - - TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) - TOKENS_INFO[mint]['decimals'] = decimals - balances[mint] = { - 'name': token_name or 'N/A', - 'address': mint, - 'amount': amount, - 'decimals': decimals - } - # sleep for 1 second to avoid rate limiting - logging.debug(f"Account balance for {token_name} ({mint}): {amount}") - else: - logging.warning(f"Unexpected data format for account: {account}") - except Exception as e: - logging.error(f"Error parsing account data: {str(e)}") - - sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) - if sol_balance.value is not None: - balances['SOL'] = { - 'name': 'SOL', - 'address': 'SOL', - 'amount': sol_balance.value / 1e9 - } - else: - logging.warning(f"SOL balance response missing for wallet: {wallet_address}") - - except Exception as e: - logging.error(f"Error getting wallet balances: {str(e)}") - logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") - return balances - -async def convert_balances_to_currency(balances , sol_price): - converted_balances = {} - for address, info in balances.items(): - converted_balance = info.copy() # Create a copy of the original info - if info['name'] == 'SOL': - converted_balance['value'] = info['amount'] * sol_price - elif address in TOKEN_PRICES: - converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] - else: - converted_balance['value'] = None # Price not available - logging.warning(f"Price not available for token {info['name']} ({address})") - converted_balances[address] = converted_balance - return converted_balances async def get_swap_transaction_details(tx_signature_str): @@ -675,201 +345,10 @@ async def get_swap_transaction_details(tx_signature_str): return None - -# # # RAW Solana API RPC # # # - -#this is the meat of the application -async def get_transaction_details_rpc(tx_signature, readfromDump=False): - global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO - try: - if readfromDump and os.path.exists('./logs/transation_details.json'): - with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details - transaction_details = json.load(f) - return transaction_details - else: - transaction_details = await solana_jsonrpc("getTransaction", tx_signature) - with open('./logs/transation_details.json', 'w') as f: - json.dump(transaction_details, f, indent=2) - - if transaction_details is None: - logging.error(f"Error fetching transaction details for {tx_signature}") - return None - - # Initialize default result structure - parsed_result = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } - - # Extract order_id from logs - log_messages = transaction_details.get("meta", {}).get("logMessages", []) - for log in log_messages: - if "order_id" in log: - parsed_result["order_id"] = log.split(":")[2].strip() - break - - # Extract token transfers from innerInstructions - inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) - for instruction_set in inner_instructions: - for instruction in instruction_set.get('instructions', []): - if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': - info = instruction['parsed']['info'] - mint = info['mint'] - amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals - - # Determine which token is being swapped in and out based on zero balances - if parsed_result["token_in"] is None and amount > 0: - parsed_result["token_in"] = mint - parsed_result["amount_in"] = amount - - - if parsed_result["token_in"] is None or parsed_result["token_out"] is None: - # if we've failed to extract token_in and token_out from the transaction details, try a second method - inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) - transfers = [] - - for instruction_set in inner_instructions: - for instruction in instruction_set.get('instructions', []): - if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: - info = instruction['parsed']['info'] - amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) - decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 - adjusted_amount = amount / (10 ** decimals) - # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) - transfers.append({ - 'mint': info.get('mint'), - 'amount': adjusted_amount, - 'source': info['source'], - 'destination': info['destination'] - }) - - # Identify token_in and token_out - if len(transfers) >= 2: - parsed_result["token_in"] = transfers[0]['mint'] - parsed_result["amount_in"] = transfers[0]['amount'] - parsed_result["token_out"] = transfers[-1]['mint'] - parsed_result["amount_out"] = transfers[-1]['amount'] - - # If mint is not provided, query the Solana network for the account data - if parsed_result["token_in"] is None or parsed_result["token_out"] is None: - #for transfer in transfers: - # do only first and last transfer - for transfer in [transfers[0], transfers[-1]]: - if transfer['mint'] is None: - # Query the Solana network for the account data - account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) - - if 'value' in account_data_result and 'data' in account_data_result['value']: - account_data_value = account_data_result['value'] - account_data_data = account_data_value['data'] - if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: - account_data_info = account_data_data['parsed']['info'] - if 'mint' in account_data_info: - transfer['mint'] = account_data_info['mint'] - if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: - await get_token_metadata_symbol(transfer['mint']) - # get actual prices - current_price = await get_token_prices([transfer['mint']]) - - if parsed_result["token_in"] is None: - parsed_result["token_in"] = transfer['mint'] - parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol'] - parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] - parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) - elif parsed_result["token_out"] is None: - parsed_result["token_out"] = transfer['mint'] - parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol'] - parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] - parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price'] - - pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) - for balance in pre_balalnces: - if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: - parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] - break - - - # Calculate percentage swapped - try: - if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: - parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 - else: - # calculate based on total wallet value: FOLLOWED_WALLET_VALUE - parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100 - except Exception as e: - logging.error(f"Error calculating percentage swapped: {e}") - - return parsed_result - - except requests.exceptions.RequestException as e: - print("Error fetching transaction details:", e) - - # # # # # # # # # # Functionality # # # # # # # # # # -async def list_initial_wallet_states(): - global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES - global TOKENS_INFO # new - - followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) - your_wallet_balances = await get_wallet_balances(YOUR_WALLET) - - all_token_addresses = list(set(followed_wallet_balances.keys()) | - set(your_wallet_balances.keys()) | - set(TOKEN_ADDRESSES.values())) - - TOKEN_PRICES = await get_token_prices(all_token_addresses) - sol_price = await get_sol_price() - - followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) - your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) - - - TOKEN_ADDRESSES = { - address: info for address, - info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 - } - logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") - - followed_wallet_state = [] - FOLLOWED_WALLET_VALUE = 0 - for address, info in followed_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") - FOLLOWED_WALLET_VALUE += info['value'] - - your_wallet_state = [] - YOUR_WALLET_VALUE = 0 - for address, info in your_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") - YOUR_WALLET_VALUE += info['value'] - - message = ( - f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" - f"Followed Wallet ({FOLLOWED_WALLET}):\n" - f"{chr(10).join(followed_wallet_state)}\n" - f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Your Wallet ({YOUR_WALLET}):\n" - f"{chr(10).join(your_wallet_state)}\n" - f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Monitored Tokens:\n" - f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" - ) - - logging.info(message) - await send_telegram_message(message) - - # save token info to file - await save_token_info() def safe_get_property(info, property_name, default='Unknown'): if not isinstance(info, dict): @@ -887,7 +366,7 @@ async def get_transaction_details_with_retry(transaction_id, retry_delay = 8, ma # qwery every 5 seconds for the transaction details untill not None or 30 seconds for _ in range(max_retries): try: - tx_details = await get_transaction_details_rpc(transaction_id) + tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: @@ -1015,13 +494,13 @@ async def process_log(log_result): f"Amount In USD: {tr_details['amount_in_USD']:.2f}\n" f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%" ) - await send_telegram_message(message_text) + await telegram_utils.send_telegram_message(message_text) await follow_move(tr_details) await save_token_info() except Exception as e: logging.error(f"Error aquiring log details and following: {e}") - await send_telegram_message(f"Not followed! Error following move.") + await telegram_utils.send_telegram_message(f"Not followed! Error following move.") @@ -1072,14 +551,14 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: async def follow_move(move): - your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) + your_balances = await solanaAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) if your_balance_info is not None: # Use the balance print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") else: print("No ballance found for {move['symbol_in']}. Skipping move.") - await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") + await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") return your_balance = your_balance_info['amount'] @@ -1087,12 +566,12 @@ async def follow_move(move): token_info = TOKENS_INFO.get(move['token_in']) token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in']) - token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await get_token_metadata_symbol(move['token_out']) + token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out']) if not your_balance: msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." logging.warning(msg) - await send_telegram_message(msg) + await telegram_utils.send_telegram_message(msg) return # move["percentage_swapped"] = (move["amount_out"] / move["amount_in"]) * 100 @@ -1119,7 +598,7 @@ async def follow_move(move): f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." ) logging.warning(msg) - await send_telegram_message(msg) + await telegram_utils.send_telegram_message(msg) try: try: @@ -1129,7 +608,7 @@ async def follow_move(move): ) # logging.info(notification) # error_logger.info(notification) - # await send_telegram_message(notification) + # await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") @@ -1159,7 +638,7 @@ async def follow_move(move): # append to notification notification += f"\n\nTransaction: {transaction_id}" - await send_telegram_message(f"Follow Transaction Sent: {transaction_id}") + await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}") tx_details = await get_transaction_details_with_retry(transaction_id) if tx_details is not None: @@ -1173,7 +652,7 @@ async def follow_move(move): # log the errors to /logs/errors.log error_logger.error(error_message) error_logger.exception(e) - await send_telegram_message(error_message) + await telegram_utils.send_telegram_message(error_message) amount = amount * 0.75 await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) @@ -1198,7 +677,7 @@ async def follow_move(move): f"\n\nTransaction: {transaction_id}" ) logging.info(notification) - await send_telegram_message(notification) + await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") @@ -1210,9 +689,9 @@ async def follow_move(move): error_logger.exception(e) \ # if error_message contains 'Program log: Error: insufficient funds' if 'insufficient funds' in error_message: - await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") + await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") else: - await send_telegram_message(error_message) + await telegram_utils.send_telegram_message(error_message) # Helper functions @@ -1236,41 +715,45 @@ async def check_PK(): if not pk: logging.error("Private key not found in environment variables. Will not be able to sign transactions.") # send TG warning message - await send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") + await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") +solanaAPI = SolanaAPI(process_transaction_callback=process_log) + async def main(): - global bot, PROCESSING_LOG + global solanaAPI, bot, PROCESSING_LOG - await send_telegram_message("Solana Agent Started. Connecting to mainnet...") + await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() # new: restart wallet_watch_loop every hour - while True: - wallet_watch_task = asyncio.create_task(wallet_watch_loop()) + await solanaAPI.wallet_watch_loop() + + # while True: + # wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop()) - try: - # Wait for an hour or until the task completes, whichever comes first - await asyncio.wait_for(wallet_watch_task, timeout=3600) - except asyncio.TimeoutError: - # If an hour has passed, cancel the task if not PROCESSING - if PROCESSING_LOG: - logging.info("wallet_watch_loop is processing logs. Will not restart.") - await send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") - else: - wallet_watch_task.cancel() - try: - await wallet_watch_task - except asyncio.CancelledError: - logging.info("wallet_watch_loop was cancelled after running for an hour") - except Exception as e: - logging.error(f"Error in wallet_watch_loop: {str(e)}") - await send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") + # try: + # # Wait for an hour or until the task completes, whichever comes first + # await asyncio.wait_for(wallet_watch_task, timeout=3600) + # except asyncio.TimeoutError: + # # If an hour has passed, cancel the task if not PROCESSING + # if PROCESSING_LOG: + # logging.info("wallet_watch_loop is processing logs. Will not restart.") + # await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") + # else: + # wallet_watch_task.cancel() + # try: + # await wallet_watch_task + # except asyncio.CancelledError: + # logging.info("wallet_watch_loop was cancelled after running for an hour") + # except Exception as e: + # logging.error(f"Error in wallet_watch_loop: {str(e)}") + # await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") - logging.info("Restarting wallet_watch_loop") - await send_telegram_message("Restarting wallet_watch_loop") + # logging.info("Restarting wallet_watch_loop") + # await telegram_utils.send_telegram_message("Restarting wallet_watch_loop") diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index bc19389..e0fa5cb 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -1,5 +1,7 @@ import sys import os + +import aiohttp sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncio @@ -7,23 +9,21 @@ import json import logging import random import websockets -from typing import Optional +from typing import Dict, List, Optional import requests -import datetime +from datetime import datetime +from solana.rpc.types import TokenAccountOpts, TxOpts logger = logging.getLogger(__name__) SOLANA_ENDPOINTS = [ "wss://api.mainnet-beta.solana.com", - # "wss://solana-api.projectserum.com", - # "wss://rpc.ankr.com/solana", - # "wss://mainnet.rpcpool.com", ] PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes +SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 1 minute from config import ( -FOLLOWED_WALLET, SOLANA_HTTP_URL + FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY ) from modules.utils import telegram_utils @@ -106,85 +106,60 @@ class SolanaWS: async def process_messages(self): while True: message = await self.message_queue.get() - await self.on_message(message) + if self.on_message: + await self.on_message(message) logger.info(f"Received message: {message}") async def close(self): if self.websocket: await self.websocket.close() logger.info("WebSocket connection closed") - while True: - message = await self.message_queue.get() - try: - response_data = json.loads(message) - if 'params' in response_data: - log = response_data['params']['result'] - await process_log(log) - else: - logger.warning(f"Unexpected response: {response_data}") - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred while processing message: {e}") - finally: - self.message_queue.task_done() + async def solana_jsonrpc(method, params=None, jsonParsed=True): + if not isinstance(params, list): + params = [params] if params is not None else [] -async def solana_jsonrpc(method, params = None, jsonParsed = True): - # target json example: - # data = { - # "jsonrpc": "2.0", - # "id": 1, - # "method": "getTransaction", - # "params": [ - # tx_signature, - # { - # "encoding": "jsonParsed", - # "maxSupportedTransactionVersion": 0 - # } - # ] - # } - # if param is not array, make it array - if not isinstance(params, list): - params = [params] + data = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params + } + + if jsonParsed: + data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}) + else: + data["params"].append({"maxSupportedTransactionVersion": 0}) - data = { - "jsonrpc": "2.0", - "id": 1, - "method": method, - "params": params or [] - } - data["params"].append({"maxSupportedTransactionVersion": 0}) - if jsonParsed: - data["params"][1]["encoding"] = "jsonParsed" - - - try: - # url = 'https://solana.drpc.org' - response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - result = response.json() - if not 'result' in result or 'error' in result: - print("Error fetching data from Solana RPC:", result) + try: + response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) + response.raise_for_status() + result = response.json() + if 'result' not in result or 'error' in result: + logger.error("Error fetching data from Solana RPC:", result) + return None + return result['result'] + except Exception as e: + logger.error(f"Error fetching data from Solana RPC: {e}") return None - return result['result'] - except Exception as e: - logging.error(f"Error fetching data from Solana RPC: {e}") - return None - -class SolanaAPI: - - def __init__(self, process_log_callback, send_telegram_message_callback, list_initial_wallet_states_callback): - self.process_log = process_log_callback - self.list_initial_wallet_states = list_initial_wallet_states_callback + +class SolanaAPI: + def __init__(self, process_transaction_callback, on_initial_subscription_callback = None, on_bot_message=None): + self.process_transaction = process_transaction_callback + self.on_initial_subscription = on_initial_subscription_callback + self.on_bot_message = on_bot_message, + + self.dex = SolanaDEX(DISPLAY_CURRENCY) + self.solana_ws = SolanaWS(on_message=self.process_transaction) async def process_messages(self, solana_ws): while True: message = await solana_ws.message_queue.get() - await self.process_log(message) - - async def wallet_watch_loop(): - solana_ws = SolanaWS(on_message=process_log) + await self.process_transaction(message) + + async def wallet_watch_loop(self): + + solana_ws = SolanaWS(on_message=self.process_transaction) first_subscription = True while True: @@ -193,10 +168,10 @@ class SolanaAPI: await solana_ws.subscribe() if first_subscription: - asyncio.create_task(self.list_initial_wallet_states()) + asyncio.create_task(self.on_initial_subscription()) first_subscription = False - await telegram_utils.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + await self.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) @@ -214,20 +189,15 @@ class SolanaAPI: finally: await solana_ws.unsubscribe() if solana_ws.websocket: - await solana_ws.websocket.close() - await telegram_utils.send_telegram_message("Reconnecting...") + await solana_ws.close() + await self.send_telegram_message("Reconnecting...") await asyncio.sleep(5) - - async def process_transaction(signature): - # Implement your logic to process each transaction + + async def process_transaction(self, signature): print(f"Processing transaction: {signature['signature']}") - # You can add more processing logic here, such as storing in a database, - # triggering notifications, etc. - # Example usage - # async def main(): - # account_address = "Vote111111111111111111111111111111111111111" - - async def get_last_transactions(account_address, check_interval=300, limit=1000): + # Add your transaction processing logic here + + async def get_last_transactions(self, account_address, check_interval=300, limit=1000): last_check_time = None last_signature = None @@ -252,17 +222,521 @@ class SolanaAPI: if last_signature and signature['signature'] == last_signature: break - # Process the transaction - await process_transaction(signature) + await self.process_transaction(signature) if result: last_signature = result[0]['signature'] last_check_time = current_time - await asyncio.sleep(1) # Sleep for 1 second before checking again + await asyncio.sleep(1) + + async def get_token_metadata_symbol(mint_address): + global TOKENS_INFO + + if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: + return TOKENS_INFO[mint_address].get('symbol') + + try: + account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address) + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_data = account_data_result['value']['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'decimals' in account_data_info: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] + else: + TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} + if 'tokenName' in account_data_info: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] + else: + TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} + + metadata = await get_token_metadata(mint_address) + if metadata: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address].update(metadata) + else: + TOKENS_INFO[mint_address] = metadata + await save_token_info() + # TOKENS_INFO[mint_address] = metadata + # return metadata.get('symbol') or metadata.get('name') + return TOKENS_INFO[mint_address].get('symbol') + except Exception as e: + logging.error(f"Error fetching token name for {mint_address}: {str(e)}") + return None + + async def get_transaction_details_rpc(tx_signature, readfromDump=False): + global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO + try: + if readfromDump and os.path.exists('./logs/transation_details.json'): + with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details + transaction_details = json.load(f) + return transaction_details + else: + transaction_details = await solana_jsonrpc("getTransaction", tx_signature) + with open('./logs/transation_details.json', 'w') as f: + json.dump(transaction_details, f, indent=2) + + if transaction_details is None: + logging.error(f"Error fetching transaction details for {tx_signature}") + return None + + # Initialize default result structure + parsed_result = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + + # Extract order_id from logs + log_messages = transaction_details.get("meta", {}).get("logMessages", []) + for log in log_messages: + if "order_id" in log: + parsed_result["order_id"] = log.split(":")[2].strip() + break + + # Extract token transfers from innerInstructions + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': + info = instruction['parsed']['info'] + mint = info['mint'] + amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals + + # Determine which token is being swapped in and out based on zero balances + if parsed_result["token_in"] is None and amount > 0: + parsed_result["token_in"] = mint + parsed_result["amount_in"] = amount + + + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + # if we've failed to extract token_in and token_out from the transaction details, try a second method + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + transfers = [] + + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: + info = instruction['parsed']['info'] + amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) + decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 + adjusted_amount = amount / (10 ** decimals) + # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) + transfers.append({ + 'mint': info.get('mint'), + 'amount': adjusted_amount, + 'source': info['source'], + 'destination': info['destination'] + }) + + # Identify token_in and token_out + if len(transfers) >= 2: + parsed_result["token_in"] = transfers[0]['mint'] + parsed_result["amount_in"] = transfers[0]['amount'] + parsed_result["token_out"] = transfers[-1]['mint'] + parsed_result["amount_out"] = transfers[-1]['amount'] + + # If mint is not provided, query the Solana network for the account data + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + #for transfer in transfers: + # do only first and last transfer + for transfer in [transfers[0], transfers[-1]]: + if transfer['mint'] is None: + # Query the Solana network for the account data + account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) + + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_value = account_data_result['value'] + account_data_data = account_data_value['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'mint' in account_data_info: + transfer['mint'] = account_data_info['mint'] + if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: + await get_token_metadata_symbol(transfer['mint']) + # get actual prices + current_price = await get_token_prices([transfer['mint']]) + + if parsed_result["token_in"] is None: + parsed_result["token_in"] = transfer['mint'] + parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol'] + parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] + parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) + elif parsed_result["token_out"] is None: + parsed_result["token_out"] = transfer['mint'] + parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol'] + parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] + parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price'] + + pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) + for balance in pre_balalnces: + if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: + parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] + break + + + # Calculate percentage swapped + try: + if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: + parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 + else: + # calculate based on total wallet value: FOLLOWED_WALLET_VALUE + parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100 + except Exception as e: + logging.error(f"Error calculating percentage swapped: {e}") + + return parsed_result + + except requests.exceptions.RequestException as e: + print("Error fetching transaction details:", e) + -if __name__ == "__main__": - asyncio.run(wallet_watch_loop()) \ No newline at end of file + +class SolanaDEX: + def __init__(self, DISPLAY_CURRENCY): + self.DISPLAY_CURRENCY = DISPLAY_CURRENCY + pass + + async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: + global TOKENS_INFO + + # Skip for USD + prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} + remaining_tokens = [addr for addr in token_addresses if addr not in prices] + + # Try CoinGecko + coingecko_prices = await get_prices_from_coingecko(remaining_tokens) + prices.update(coingecko_prices) + + + # For remaining missing tokens, try Jupiter + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) + prices.update(jupiter_prices) + + + # For tokens not found in CoinGecko, use DexScreener + missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) + if missing_tokens: + dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) + prices.update(dexscreener_prices) + + # For remaining missing tokens, try Raydium + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + raydium_prices = await get_prices_from_raydium(list(missing_tokens)) + prices.update(raydium_prices) + + # For remaining missing tokens, try Orca + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + orca_prices = await get_prices_from_orca(list(missing_tokens)) + prices.update(orca_prices) + + # If any tokens are still missing, set their prices to 0 + for token in set(token_addresses) - set(prices.keys()): + prices[token] = 0.0 + logging.warning(f"Price not found for token {token}. Setting to 0.") + + for token, price in prices.items(): + token_info = TOKENS_INFO.setdefault(token, {}) + if 'symbol' not in token_info: + token_info['symbol'] = await get_token_metadata_symbol(token) + token_info['price'] = price + + return prices + + async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: + base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" + prices = {} + + async def fetch_single_price(session, address): + params = { + "contract_addresses": address, + "vs_currencies": DISPLAY_CURRENCY.lower() + } + try: + async with session.get(base_url, params=params) as response: + if response.status == 200: + data = await response.json() + if address in data and DISPLAY_CURRENCY.lower() in data[address]: + return address, data[address][DISPLAY_CURRENCY.lower()] + else: + logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") + return address, None + + async with aiohttp.ClientSession() as session: + tasks = [fetch_single_price(session, address) for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, price in results: + if price is not None: + prices[address] = price + + return prices + + async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: + base_url = "https://api.dexscreener.com/latest/dex/tokens/" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, result in zip(token_addresses, results): + if result and 'pairs' in result and result['pairs']: + pair = result['pairs'][0] # Use the first pair (usually the most liquid) + prices[address] = float(pair['priceUsd']) + else: + logging.warning(f"No price data found on DexScreener for token {address}") + except Exception as e: + logging.error(f"Error fetching token prices from DexScreener: {str(e)}") + + return prices + + async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: + url = "https://price.jup.ag/v4/price" + params = { + "ids": ",".join(token_addresses) + } + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + for address, price_info in data.get('data', {}).items(): + if 'price' in price_info: + prices[address] = float(price_info['price']) + else: + logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Jupiter: {str(e)}") + return prices + + # New function for Raydium + async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.raydium.io/v2/main/price" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for address in token_addresses: + if address in data: + prices[address] = float(data[address]) + else: + logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Raydium: {str(e)}") + return prices + + # New function for Orca + async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.orca.so/allTokens" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for token_info in data: + if token_info['mint'] in token_addresses: + prices[token_info['mint']] = float(token_info['price']) + else: + logging.error(f"Failed to get token prices from Orca. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Orca: {str(e)}") + return prices + + async def fetch_token_data(session, url): + try: + async with session.get(url) as response: + if response.status == 200: + return await response.json() + else: + logging.error(f"Failed to fetch data from {url}. Status: {response.status}") + return None + except Exception as e: + logging.error(f"Error fetching data from {url}: {str(e)}") + return None + + async def get_sol_price() -> float: + sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address + return await get_token_prices([sol_address]).get(sol_address, 0.0) + + async def get_wallet_balances(wallet_address, doGetTokenName=True): + balances = {} + logging.info(f"Getting balances for wallet: {wallet_address}") + global TOKENS_INFO + try: + response = await solana_client.get_token_accounts_by_owner_json_parsed( + Pubkey.from_string(wallet_address), + opts=TokenAccountOpts( + program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") + ), + commitment=Confirmed + ) + + if response.value: + for account in response.value: + try: + parsed_data = account.account.data.parsed + if isinstance(parsed_data, dict) and 'info' in parsed_data: + info = parsed_data['info'] + if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: + mint = info['mint'] + decimals = info['tokenAmount']['decimals'] + amount = float(info['tokenAmount']['amount'])/10**decimals + if amount > 0: + if mint in TOKENS_INFO: + token_name = TOKENS_INFO[mint].get('symbol') + elif doGetTokenName: + token_name = await get_token_metadata_symbol(mint) or 'N/A' + # sleep for 1 second to avoid rate limiting + await asyncio.sleep(2) + + TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) + TOKENS_INFO[mint]['decimals'] = decimals + balances[mint] = { + 'name': token_name or 'N/A', + 'address': mint, + 'amount': amount, + 'decimals': decimals + } + # sleep for 1 second to avoid rate limiting + logging.debug(f"Account balance for {token_name} ({mint}): {amount}") + else: + logging.warning(f"Unexpected data format for account: {account}") + except Exception as e: + logging.error(f"Error parsing account data: {str(e)}") + + sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) + if sol_balance.value is not None: + balances['SOL'] = { + 'name': 'SOL', + 'address': 'SOL', + 'amount': sol_balance.value / 1e9 + } + else: + logging.warning(f"SOL balance response missing for wallet: {wallet_address}") + + except Exception as e: + logging.error(f"Error getting wallet balances: {str(e)}") + logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") + return balances + + async def convert_balances_to_currency(balances , sol_price): + converted_balances = {} + for address, info in balances.items(): + converted_balance = info.copy() # Create a copy of the original info + if info['name'] == 'SOL': + converted_balance['value'] = info['amount'] * sol_price + elif address in TOKEN_PRICES: + converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] + else: + converted_balance['value'] = None # Price not available + logging.warning(f"Price not available for token {info['name']} ({address})") + converted_balances[address] = converted_balance + return converted_balances + + + async def list_initial_wallet_states(): + global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES + global TOKENS_INFO # new + + followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) + your_wallet_balances = await get_wallet_balances(YOUR_WALLET) + + all_token_addresses = list(set(followed_wallet_balances.keys()) | + set(your_wallet_balances.keys()) | + set(TOKEN_ADDRESSES.values())) + + TOKEN_PRICES = await get_token_prices(all_token_addresses) + sol_price = await get_sol_price() + + followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) + your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) + + + TOKEN_ADDRESSES = { + address: info for address, + info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 + } + logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") + + followed_wallet_state = [] + FOLLOWED_WALLET_VALUE = 0 + for address, info in followed_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") + FOLLOWED_WALLET_VALUE += info['value'] + + your_wallet_state = [] + YOUR_WALLET_VALUE = 0 + for address, info in your_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") + YOUR_WALLET_VALUE += info['value'] + + message = ( + f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" + f"Followed Wallet ({FOLLOWED_WALLET}):\n" + f"{chr(10).join(followed_wallet_state)}\n" + f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Your Wallet ({YOUR_WALLET}):\n" + f"{chr(10).join(your_wallet_state)}\n" + f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Monitored Tokens:\n" + f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" + ) + + logging.info(message) + await telegram_utils.send_telegram_message(message) + + # save token info to file + await save_token_info() + + + +#example +# async def main(): +# await telegram_utils.initialize() + +# async def process_log(log): +# print(f"Processing log: {log}") + +# async def list_initial_wallet_states(): +# print("Listing initial wallet states") + + +# wallet_watch_task = asyncio.create_task(solana_api.wallet_watch_loop()) + +# try: +# await asyncio.gather(wallet_watch_task) +# except asyncio.CancelledError: +# pass +# finally: +# await telegram_utils.close() + +# if __name__ == "__main__": +# asyncio.run(main()) \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index b70b567..6ed7ab7 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -28,7 +28,7 @@ class TelegramUtils: await self.initialize() try: - await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + # await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}")