get token metadata (name, symbol)

This commit is contained in:
Dobromir Popov 2024-10-08 12:43:28 +03:00
parent de03e32d75
commit 6f262078fc

View File

@ -8,6 +8,7 @@ from solana.rpc.websocket_api import connect
from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.commitment import Confirmed, Processed
from solana.transaction import Transaction
from spl.token.client import Token
from base64 import b64decode
import base58
from solders.rpc.requests import GetTransaction
@ -27,6 +28,7 @@ from telegram import Bot
from telegram.constants import ParseMode
import datetime
import logging
from logging.handlers import RotatingFileHandler
import base64
import os
from dotenv import load_dotenv,set_key
@ -37,13 +39,29 @@ import threading
import re
from typing import List, Dict, Any, Tuple
load_dotenv()
# Load secret environment variables
load_dotenv('.env.secret')
load_dotenv('.env.secret') # ToDo - make it work
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
# Set up error logger
error_file_handler = RotatingFileHandler(
'./logs/error.log',
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
error_file_handler.setLevel(logging.ERROR)
error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') )
error_logger = logging.getLogger('error_logger')
error_logger.setLevel(logging.ERROR)
error_logger.addHandler(error_file_handler)
pk = os.getenv("PK")
if not pk:
try:
with open('./secret.pk', 'r') as f:
with open('./crypto/sol/secret.pk', 'r') as f:
pk = f.read().strip()
if pk:
logging.info("Private key loaded successfully from file.")
@ -134,6 +152,8 @@ TOKEN_ADDRESSES = {
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og",
}
TOKENS_INFO = {}
# # # # # # # # # # TELEGRAM # # # # # # # # # #
async def send_telegram_message(message):
@ -284,16 +304,151 @@ async def get_token_balance_rpc(wallet_address, token_address):
# # # solders/solana libs (solana_client) # # #
from spl.token._layouts import MINT_LAYOUT
from solana.rpc.api import Client, Pubkey
from spl.token.async_client import AsyncToken
from spl.token.constants import TOKEN_PROGRAM_ID
from borsh_construct import String, CStruct
async def get_token_name(mint_address):
global TOKENS_INFO
if mint_address in TOKENS_INFO:
return TOKENS_INFO[mint_address].get('symbol')
try:
account_data_result = await solana_jsonrpc("getAccountInfo", mint_address)
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_data = account_data_result['value']['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'tokenName' in account_data_info:
TOKENS_INFO[mint_address] = account_data_info
return account_data_info['tokenName']
if 'decimals' in account_data_info:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
else:
TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
token_address = Pubkey.from_string(mint_address)
# token = Token(solana_client, token_address, TOKEN_PROGRAM_ID, Keypair())
token = AsyncToken(solana_client, token_address, TOKEN_PROGRAM_ID, Keypair())
tokenInfo = await token.get_mint_info()
token_info = await solana_client.get_account_info_json_parsed(Pubkey.from_string(mint_address))
if token_info.value and 'symbol' in token_info.value:
return token_info.value['symbol']
# if token_info.value and 'symbol' in token_info.value:
# return token_info.value['symbol']
metadata = await get_token_metadata(mint_address)
if metadata:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address].update(metadata)
else:
TOKENS_INFO[mint_address] = metadata
return metadata.get('symbol') or metadata.get('name')
# TOKENS_INFO[mint_address] = metadata
# return metadata.get('symbol') or metadata.get('name')
except Exception as e:
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
return None
METADATA_STRUCT = CStruct(
"update_authority" / String,
"mint" / String,
"name" / String,
"symbol" / String,
"uri" / String,
# ... other fields ...
)
import struct
def get_token_name_metadata(metadata_account_data):
try:
# Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint)
offset = 1 + 32 + 32
# Read the name length (u32)
name_length = struct.unpack("<I", metadata_account_data[offset:offset+4])[0]
offset += 4
# Read the name
name = metadata_account_data[offset:offset+name_length].decode('utf-8')
return name
except Exception as e:
print(f"Error parsing metadata: {e}")
return None
async def get_token_metadata(mint_address):
try:
# Convert mint_address to PublicKey if it's a string
if isinstance(mint_address, str):
mint_pubkey = Pubkey.from_string(mint_address)
else:
mint_pubkey = mint_address
# Derive metadata account address
metadata_program_id = Pubkey.from_string("metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s")
metadata_account = Pubkey.find_program_address(
[b"metadata", bytes(metadata_program_id), bytes(mint_pubkey)],
metadata_program_id
)[0]
# Fetch metadata account info
metadata_account_info = await solana_client.get_account_info(metadata_account)
if metadata_account_info.value is not None:
data = metadata_account_info.value.data
# name = get_token_name_metadata(data).rstrip("\x00")
# Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint)
offset = 1 + 32 + 32
# Read the name length (u32)
name_length = struct.unpack("<I", data[offset:offset+4])[0]
offset += 4
# Read the name
name = data[offset:offset+name_length].decode('utf-8').rstrip("\x00")
offset += name_length
# Read the symbol length (u32)
symbol_length = struct.unpack("<I", data[offset:offset+4])[0]
offset += 4
# Read the symbol
symbol = data[offset:offset+symbol_length].decode('utf-8').rstrip("\x00")
# metadata = METADATA_STRUCT.parse(data)
# ERROR:root:Error fetching token metadata for 3psH1Mj1f7yUfaD5gh6Zj7epE8hhrMkMETgv5TshQA4o: Error in path (parsing) -> update_authority
# stream read less than specified amount, expected 2189641476, found 675
# # Parse metadata
# key = data[:4]
# update_authority = Pubkey(data[4:36])
# mint = Pubkey(data[36:68])
# name_length = struct.unpack('<I', data[68:72])[0]
# symbol_length = struct.unpack('<I', data[72:76])[0]
# uri_length = struct.unpack('<I', data[76:80])[0]
# name = data[80:80 + name_length].decode('utf-8')
# symbol = data[80 + name_length:80 + name_length + symbol_length].decode('utf-8')
# uri = data[80 + name_length + symbol_length:].decode('utf-8')
return {"name": name, "symbol": symbol, "address": mint_address}
# if metadata_account_info.value is not None:
# # Decode metadata
# #data = base58.b58decode(metadata_account_info.value.data)
# data = metadata_account_info.value.data
# if len(data) > 0 and data[0] == 4:
# name_len = int.from_bytes(data[1:5], byteorder="little")
# name = data[5:5+name_len].decode("utf-8").strip("\x00")
# symbol_len = int.from_bytes(data[5+name_len:9+name_len], byteorder="little")
# symbol = data[9+name_len:9+name_len+symbol_len].decode("utf-8").strip("\x00")
# return {"name": name, "symbol": symbol}
except Exception as e:
logging.error(f"Error fetching token metadata for {mint_address}: {str(e)}")
return None
async def get_wallet_balances(wallet_address):
balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}")
@ -419,7 +574,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False):
transaction_details = json.load(f)
return transaction_details
else:
transaction_details = await solana_jsonrpc("getTransaction", [tx_signature])
transaction_details = await solana_jsonrpc("getTransaction", tx_signature)
with open('./logs/transation_details.json', 'w') as f:
json.dump(transaction_details, f, indent=2)
@ -495,7 +650,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False):
for transfer in [transfers[0], transfers[-1]]:
if transfer['mint'] is None:
# Query the Solana network for the account data
account_data_result = await solana_jsonrpc("getAccountInfo", [transfer['source']])
account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source'])
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_value = account_data_result['value']
@ -545,6 +700,9 @@ async def solana_jsonrpc(method, params = None, jsonParsed = True):
# }
# ]
# }
# if param is not array, make it array
if not isinstance(params, list):
params = [params]
data = {
"jsonrpc": "2.0",
@ -588,7 +746,10 @@ async def list_initial_wallet_states():
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0}
TOKEN_ADDRESSES = {
address: info for address,
info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
@ -801,15 +962,9 @@ async def follow_move(move):
if your_balance >= amount_to_swap:
try:
pk_b58 = os.getenv("PK")
if not pk_b58:
logging.error("Private key not found. Can not sign transactions without private key.")
return
private_key = Keypair.from_bytes(base58.b58decode(pk_b58))
private_key = Keypair.from_bytes(base58.b58decode(pk))
async_client = AsyncClient(SOLANA_WS_URL)
jupiter = Jupiter(async_client, private_key)
jupiter = Jupiter(async_client, private_key)
# Convert to lamports
# if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9
amount = int(amount_to_swap * 10**your_balance_info['decimals'])
@ -866,6 +1021,9 @@ async def follow_move(move):
except Exception as e:
error_message = f"<b>Swap Follow Error:</b>\n{str(e)}"
logging.error(error_message)
# log the errors to /logs/errors.log
error_logger.error(error_message)
error_logger.exception(e)
# await send_telegram_message(error_message)
else:
msg = (
@ -958,14 +1116,12 @@ async def subscribe_to_wallet():
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
logger = logging.getLogger(__name__)
async def main():
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
# await list_initial_wallet_states()
await list_initial_wallet_states()
await subscribe_to_wallet()
def run_flask():