From 6f262078fc435f5bfad4ace5d26d4e72afb519c0 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 8 Oct 2024 12:43:28 +0300 Subject: [PATCH] get token metadata (name, symbol) --- crypto/sol/app.py | 194 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 175 insertions(+), 19 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 5dabd16..645c76e 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -8,6 +8,7 @@ from solana.rpc.websocket_api import connect from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction +from spl.token.client import Token from base64 import b64decode import base58 from solders.rpc.requests import GetTransaction @@ -27,6 +28,7 @@ from telegram import Bot from telegram.constants import ParseMode import datetime import logging +from logging.handlers import RotatingFileHandler import base64 import os from dotenv import load_dotenv,set_key @@ -37,13 +39,29 @@ import threading import re from typing import List, Dict, Any, Tuple + load_dotenv() -# Load secret environment variables -load_dotenv('.env.secret') +load_dotenv('.env.secret') # ToDo - make it work + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG) + +# Set up error logger +error_file_handler = RotatingFileHandler( + './logs/error.log', + maxBytes=10*1024*1024, # 10MB + backupCount=5 +) +error_file_handler.setLevel(logging.ERROR) +error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) +error_logger = logging.getLogger('error_logger') +error_logger.setLevel(logging.ERROR) +error_logger.addHandler(error_file_handler) + pk = os.getenv("PK") if not pk: try: - with open('./secret.pk', 'r') as f: + with open('./crypto/sol/secret.pk', 'r') as f: pk = f.read().strip() if pk: logging.info("Private key loaded successfully from file.") @@ -134,6 +152,8 @@ TOKEN_ADDRESSES = { "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", } +TOKENS_INFO = {} + # # # # # # # # # # TELEGRAM # # # # # # # # # # async def send_telegram_message(message): @@ -284,16 +304,151 @@ async def get_token_balance_rpc(wallet_address, token_address): # # # solders/solana libs (solana_client) # # # +from spl.token._layouts import MINT_LAYOUT +from solana.rpc.api import Client, Pubkey +from spl.token.async_client import AsyncToken + +from spl.token.constants import TOKEN_PROGRAM_ID +from borsh_construct import String, CStruct async def get_token_name(mint_address): + global TOKENS_INFO + if mint_address in TOKENS_INFO: + return TOKENS_INFO[mint_address].get('symbol') try: + account_data_result = await solana_jsonrpc("getAccountInfo", mint_address) + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_data = account_data_result['value']['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'tokenName' in account_data_info: + TOKENS_INFO[mint_address] = account_data_info + return account_data_info['tokenName'] + if 'decimals' in account_data_info: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] + else: + TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} + token_address = Pubkey.from_string(mint_address) + # token = Token(solana_client, token_address, TOKEN_PROGRAM_ID, Keypair()) + token = AsyncToken(solana_client, token_address, TOKEN_PROGRAM_ID, Keypair()) + tokenInfo = await token.get_mint_info() token_info = await solana_client.get_account_info_json_parsed(Pubkey.from_string(mint_address)) - if token_info.value and 'symbol' in token_info.value: - return token_info.value['symbol'] + # if token_info.value and 'symbol' in token_info.value: + # return token_info.value['symbol'] + metadata = await get_token_metadata(mint_address) + if metadata: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address].update(metadata) + else: + TOKENS_INFO[mint_address] = metadata + return metadata.get('symbol') or metadata.get('name') + # TOKENS_INFO[mint_address] = metadata + # return metadata.get('symbol') or metadata.get('name') + except Exception as e: logging.error(f"Error fetching token name for {mint_address}: {str(e)}") return None + +METADATA_STRUCT = CStruct( + "update_authority" / String, + "mint" / String, + "name" / String, + "symbol" / String, + "uri" / String, + # ... other fields ... +) +import struct +def get_token_name_metadata(metadata_account_data): + try: + # Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint) + offset = 1 + 32 + 32 + + # Read the name length (u32) + name_length = struct.unpack(" update_authority +# stream read less than specified amount, expected 2189641476, found 675 + + # # Parse metadata + # key = data[:4] + # update_authority = Pubkey(data[4:36]) + # mint = Pubkey(data[36:68]) + # name_length = struct.unpack(' 0 and data[0] == 4: + # name_len = int.from_bytes(data[1:5], byteorder="little") + # name = data[5:5+name_len].decode("utf-8").strip("\x00") + + # symbol_len = int.from_bytes(data[5+name_len:9+name_len], byteorder="little") + # symbol = data[9+name_len:9+name_len+symbol_len].decode("utf-8").strip("\x00") + + # return {"name": name, "symbol": symbol} + except Exception as e: + logging.error(f"Error fetching token metadata for {mint_address}: {str(e)}") + + return None + + async def get_wallet_balances(wallet_address): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") @@ -419,7 +574,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False): transaction_details = json.load(f) return transaction_details else: - transaction_details = await solana_jsonrpc("getTransaction", [tx_signature]) + transaction_details = await solana_jsonrpc("getTransaction", tx_signature) with open('./logs/transation_details.json', 'w') as f: json.dump(transaction_details, f, indent=2) @@ -495,7 +650,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False): for transfer in [transfers[0], transfers[-1]]: if transfer['mint'] is None: # Query the Solana network for the account data - account_data_result = await solana_jsonrpc("getAccountInfo", [transfer['source']]) + account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_value = account_data_result['value'] @@ -545,6 +700,9 @@ async def solana_jsonrpc(method, params = None, jsonParsed = True): # } # ] # } + # if param is not array, make it array + if not isinstance(params, list): + params = [params] data = { "jsonrpc": "2.0", @@ -588,7 +746,10 @@ async def list_initial_wallet_states(): followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) - TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0} + TOKEN_ADDRESSES = { + address: info for address, + info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 + } logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") followed_wallet_state = [] @@ -801,15 +962,9 @@ async def follow_move(move): if your_balance >= amount_to_swap: try: - pk_b58 = os.getenv("PK") - if not pk_b58: - logging.error("Private key not found. Can not sign transactions without private key.") - return - - private_key = Keypair.from_bytes(base58.b58decode(pk_b58)) + private_key = Keypair.from_bytes(base58.b58decode(pk)) async_client = AsyncClient(SOLANA_WS_URL) - jupiter = Jupiter(async_client, private_key) - + jupiter = Jupiter(async_client, private_key) # Convert to lamports # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9 amount = int(amount_to_swap * 10**your_balance_info['decimals']) @@ -866,6 +1021,9 @@ async def follow_move(move): except Exception as e: error_message = f"Swap Follow Error:\n{str(e)}" logging.error(error_message) + # log the errors to /logs/errors.log + error_logger.error(error_message) + error_logger.exception(e) # await send_telegram_message(error_message) else: msg = ( @@ -958,14 +1116,12 @@ async def subscribe_to_wallet(): reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) -logger = logging.getLogger(__name__) async def main(): # Initialize logging - logging.basicConfig(level=logging.DEBUG) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") - # await list_initial_wallet_states() + await list_initial_wallet_states() await subscribe_to_wallet() def run_flask():