getting ballances works

This commit is contained in:
Dobromir Popov 2024-10-03 01:33:26 +03:00
parent 59c9dd5f74
commit b9405424ea
4 changed files with 210 additions and 22 deletions

View File

@ -2,7 +2,9 @@ SOLANA_NET_URL="wss://api.mainnet-beta.solana.com"
DEVELOPER_CHAT_ID="777826553"
# Niki's
# FOLLOWED_WALLET="9U7D916zuQ8qcL9kQZqkcroWhHGho5vD8VNekvztrutN"
# My test
# My test Brave sync wallet
FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV"
YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5"
TELEGRAM_BOT_TOKEN="6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw"
DISPLAY_CURRENCY=USD

View File

@ -17,6 +17,8 @@ from solana.rpc.types import TokenAccountOpts
import base64
import os
from dotenv import load_dotenv
import aiohttp
from typing import List, Dict
@ -33,6 +35,7 @@ FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
YOUR_WALLET = os.getenv("YOUR_WALLET")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
SOLANA_URL = os.getenv("SOLANA_NET_URL")
DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
@ -54,6 +57,142 @@ async def send_telegram_message(message):
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")
# async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
# url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
# params = {
# "contract_addresses": ",".join(token_addresses),
# "vs_currencies": DISPLAY_CURRENCY.lower()
# }
# prices = {}
# async with aiohttp.ClientSession() as session:
# async with session.get(url, params=params) as response:
# if response.status == 200:
# data = await response.json()
# for address, price_info in data.items():
# if DISPLAY_CURRENCY.lower() in price_info:
# prices[address] = price_info[DISPLAY_CURRENCY.lower()]
# else:
# logging.error(f"Failed to get token prices. Status: {response.status}")
# # For tokens not found in CoinGecko, try to get price from a DEX or set a default value
# missing_tokens = set(token_addresses) - set(prices.keys())
# for token in missing_tokens:
# # You might want to implement a fallback method here, such as:
# # prices[token] = await get_price_from_dex(token)
# # For now, we'll set a default value
# prices[token] = 0.0
# logging.warning(f"Price not found for token {token}. Setting to 0.")
# return prices
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
coingecko_prices = await get_prices_from_coingecko(token_addresses)
# For tokens not found in CoinGecko, use DexScreener
missing_tokens = set(token_addresses) - set(coingecko_prices.keys())
if missing_tokens:
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
coingecko_prices.update(dexscreener_prices)
# If any tokens are still missing, set their prices to 0
for token in set(token_addresses) - set(coingecko_prices.keys()):
coingecko_prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.")
return coingecko_prices
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
params = {
"contract_addresses": ",".join(token_addresses),
"vs_currencies": DISPLAY_CURRENCY.lower()
}
prices = {}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.items():
if DISPLAY_CURRENCY.lower() in price_info:
prices[address] = price_info[DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}")
return prices
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {}
async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
return prices
async def fetch_token_data(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching data from {url}: {str(e)}")
return None
async def get_sol_price() -> float:
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['solana'][DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
return await get_sol_price_from_dexscreener()
async def get_sol_price_from_dexscreener() -> float:
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
prices = await get_prices_from_dexscreener([sol_address])
return prices.get(sol_address, 0.0)
async def get_sol_price() -> float:
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['solana'][DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get SOL price. Status: {response.status}")
return None
async def convert_balances_to_currency(balances, token_prices, sol_price):
converted_balances = {}
for token, amount in balances.items():
if token == 'SOL':
converted_balances[token] = amount * sol_price
elif token in token_prices:
converted_balances[token] = amount * token_prices[token]
else:
converted_balances[token] = None # Price not available
logging.warning(f"Price not available for token {token}")
return converted_balances
async def get_token_balance(wallet_address, token_address):
try:
response = await solana_client.get_token_accounts_by_owner(
@ -84,6 +223,7 @@ class SolanaEncoder(json.JSONEncoder):
async def get_wallet_balances(wallet_address):
balances = {}
token_addresses = []
logging.info(f"Getting balances for wallet: {wallet_address}")
try:
@ -99,12 +239,14 @@ async def get_wallet_balances(wallet_address):
for account in response.value:
parsed_data = account.account.data.parsed
if isinstance(parsed_data, dict) and 'info' in parsed_data:
mint = parsed_data['info']['mint']
balance_response = await solana_client.get_token_account_balance(account.pubkey)
if balance_response.value:
amount = float(balance_response.value.ui_amount)
balances[mint] = amount
logging.debug(f"Balance for {mint}: {amount}")
info = parsed_data['info']
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
mint = info['mint']
amount = float(info['tokenAmount']['uiAmount'])
if amount > 0:
token_addresses.append(mint)
balances[mint] = amount
logging.debug(f"Balance for {mint}: {amount}")
else:
logging.warning(f"Unexpected data format for account: {account}")
@ -117,12 +259,36 @@ async def get_wallet_balances(wallet_address):
except Exception as e:
logging.error(f"Error getting wallet balances: {str(e)}")
return balances
return balances, token_addresses
async def get_converted_balances(wallet_address):
balances, token_addresses = await get_wallet_balances(wallet_address)
token_prices = await get_token_prices(token_addresses)
sol_price = await get_sol_price()
converted_balances = await convert_balances_to_currency(balances, token_prices, sol_price)
return converted_balances
async def send_initial_wallet_states(followed_wallet, your_wallet):
followed_balances = await get_converted_balances(followed_wallet)
your_balances = await get_converted_balances(your_wallet)
message = f"Initial Wallet States (Non-zero balances in {DISPLAY_CURRENCY}):\n\n"
message += f"Followed Wallet ({followed_wallet}):\n"
for token, amount in followed_balances.items():
if amount and amount > 0:
message += f"{token}: {amount:.2f}\n"
message += f"\nYour Wallet ({your_wallet}):\n"
for token, amount in your_balances.items():
if amount and amount > 0:
message += f"{token}: {amount:.2f}\n"
message += "\nMonitored Tokens:\n"
# Add monitored tokens logic here if needed
await bot.send_message(chat_id=CHAT_ID, text=message)
@ -136,27 +302,45 @@ async def get_non_zero_token_balances(wallet_address):
logging.debug(f"Non-zero balance for {token}: {balance}")
return non_zero_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
followed_wallet_balances, followed_token_addresses = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances, your_token_addresses = await get_wallet_balances(YOUR_WALLET)
followed_non_zero = await get_non_zero_token_balances(FOLLOWED_WALLET)
your_non_zero = await get_non_zero_token_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_token_addresses + your_token_addresses))
token_prices = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
TOKEN_ADDRESSES = {**followed_non_zero, **your_non_zero}
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price)
TOKEN_ADDRESSES = {token: amount for token, amount in {**followed_converted_balances, **your_converted_balances}.items() if amount is not None and amount > 0}
logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}")
followed_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in followed_wallet_balances.items() if amount > 0])
your_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in your_wallet_balances.items() if amount > 0])
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for token, amount in followed_converted_balances.items():
if amount is not None and amount > 0:
followed_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
FOLLOWED_WALLET_VALUE += amount
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for token, amount in your_converted_balances.items():
if amount is not None and amount > 0:
your_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += amount
message = (
"<b>Initial Wallet States (Non-zero balances):</b>\n\n"
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{followed_wallet_state}\n\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{your_wallet_state}\n\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join(TOKEN_ADDRESSES.keys())}"
)
@ -164,6 +348,7 @@ async def list_initial_wallet_states():
logging.info(message)
await send_telegram_message(message)
async def follow_move(move):
followed_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_balances = await get_wallet_balances(YOUR_WALLET)

View File

@ -4,3 +4,4 @@ dexscreener
python-telegram-bot
asyncio
base58
aiohttp

View File

@ -3,7 +3,7 @@ To run this Python Solana agent:
Install the required libraries:
`pip install flask solana dexscreener python-telegram-bot asyncio base58`
`pip install flask solana dexscreener python-telegram-bot asyncio base58 aiohttp`
Replace REPLACE_WITH_WALLET_ADDRESS with the wallet address you want to follow.
Replace REPLACE_WITH_YOUR_WALLET_ADDRESS with your own wallet address.