big refactoring

This commit is contained in:
Dobromir Popov 2024-10-15 23:19:30 +03:00
parent 8d714a9801
commit d63d3d41bc
3 changed files with 612 additions and 655 deletions

View File

@ -5,7 +5,6 @@ from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.transaction import Signature
from solana.rpc.websocket_api import connect
from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.commitment import Confirmed, Processed
from solana.transaction import Transaction
from spl.token.client import Token
@ -24,6 +23,7 @@ from solders.instruction import CompiledInstruction
from solders import message
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
from dexscreener import DexscreenerClient
from solana.rpc.types import TokenAccountOpts, TxOpts
import datetime
import logging
@ -55,38 +55,9 @@ from config import (
error_logger
)
from modules.utils import (get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details)
from modules.SolanaAPI import SolanaAPI, solana_jsonrpc, wallet_watch_loop
from modules.utils import telegram_utils, send_telegram_message
# # config = load_config()
# load_dotenv()
# load_dotenv('.env.secret')
# # Configuration
# DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
# FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
# YOUR_WALLET = os.getenv("YOUR_WALLET")
# TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# SOLANA_WS_URL = os.getenv("SOLANA_WS_URL")
# SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL")
# DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
# BOT_NAME = os.getenv("BOT_NAME")
# logger = logging.getLogger(__name__)
# logging.basicConfig(level=logging.DEBUG)
# #logging.basicConfig(level=logging.INFO)
# # Set up error logger
# log_dir = './logs'
# log_file = os.path.join(log_dir, 'error.log')
# os.makedirs(log_dir, exist_ok=True)
# error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5)
# error_file_handler.setLevel(logging.ERROR)
# error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') )
# error_logger = logging.getLogger('error_logger')
# error_logger.setLevel(logging.ERROR)
# error_logger.addHandler(error_file_handler)
from modules.SolanaAPI import SolanaAPI, SolanaDEX
from modules.utils import telegram_utils
# Function to find the latest log file
@ -145,7 +116,7 @@ if not telegram_utils.bot:
asyncio.run(telegram_utils.initialize())
except Exception as e:
logging.error(f"Error initializing Telegram bot: {str(e)}")
# async def send_telegram_message(message):
# async def telegram_utils.send_telegram_message(message):
# try:
# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
# logging.info(f"Telegram message sent: {message}")
@ -161,196 +132,6 @@ if not telegram_utils.bot:
# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # #
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
global TOKENS_INFO
# Skip for USD
prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
remaining_tokens = [addr for addr in token_addresses if addr not in prices]
# Try CoinGecko
coingecko_prices = await get_prices_from_coingecko(remaining_tokens)
prices.update(coingecko_prices)
# For remaining missing tokens, try Jupiter
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
jupiter_prices = await get_prices_from_jupiter(list(missing_tokens))
prices.update(jupiter_prices)
# For tokens not found in CoinGecko, use DexScreener
missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
if missing_tokens:
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
prices.update(dexscreener_prices)
# For remaining missing tokens, try Raydium
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
raydium_prices = await get_prices_from_raydium(list(missing_tokens))
prices.update(raydium_prices)
# For remaining missing tokens, try Orca
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
orca_prices = await get_prices_from_orca(list(missing_tokens))
prices.update(orca_prices)
# If any tokens are still missing, set their prices to 0
for token in set(token_addresses) - set(prices.keys()):
prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.")
for token, price in prices.items():
token_info = TOKENS_INFO.setdefault(token, {})
if 'symbol' not in token_info:
token_info['symbol'] = await get_token_metadata_symbol(token)
token_info['price'] = price
return prices
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
prices = {}
async def fetch_single_price(session, address):
params = {
"contract_addresses": address,
"vs_currencies": DISPLAY_CURRENCY.lower()
}
try:
async with session.get(base_url, params=params) as response:
if response.status == 200:
data = await response.json()
if address in data and DISPLAY_CURRENCY.lower() in data[address]:
return address, data[address][DISPLAY_CURRENCY.lower()]
else:
logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}")
return address, None
async with aiohttp.ClientSession() as session:
tasks = [fetch_single_price(session, address) for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, price in results:
if price is not None:
prices[address] = price
return prices
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {}
try:
async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
except Exception as e:
logging.error(f"Error fetching token prices from DexScreener: {str(e)}")
return prices
async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]:
url = "https://price.jup.ag/v4/price"
params = {
"ids": ",".join(token_addresses)
}
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.get('data', {}).items():
if 'price' in price_info:
prices[address] = float(price_info['price'])
else:
logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
return prices
# New function for Raydium
async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.raydium.io/v2/main/price"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for address in token_addresses:
if address in data:
prices[address] = float(data[address])
else:
logging.error(f"Failed to get token prices from Raydium. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Raydium: {str(e)}")
return prices
# New function for Orca
async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.orca.so/allTokens"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for token_info in data:
if token_info['mint'] in token_addresses:
prices[token_info['mint']] = float(token_info['price'])
else:
logging.error(f"Failed to get token prices from Orca. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Orca: {str(e)}")
return prices
async def fetch_token_data(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching data from {url}: {str(e)}")
return None
async def get_sol_price() -> float:
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['solana'][DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
return await get_sol_price_from_dexscreener()
async def get_sol_price_from_dexscreener() -> float:
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
prices = await get_prices_from_dexscreener([sol_address])
return prices.get(sol_address, 0.0)
# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # #
@ -415,43 +196,6 @@ from spl.token.async_client import AsyncToken
from spl.token.constants import TOKEN_PROGRAM_ID
from borsh_construct import String, CStruct
async def get_token_metadata_symbol(mint_address):
global TOKENS_INFO
if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]:
return TOKENS_INFO[mint_address].get('symbol')
try:
account_data_result = await solana_jsonrpc("getAccountInfo", mint_address)
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_data = account_data_result['value']['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'decimals' in account_data_info:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
else:
TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
if 'tokenName' in account_data_info:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
else:
TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
metadata = await get_token_metadata(mint_address)
if metadata:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address].update(metadata)
else:
TOKENS_INFO[mint_address] = metadata
await save_token_info()
# TOKENS_INFO[mint_address] = metadata
# return metadata.get('symbol') or metadata.get('name')
return TOKENS_INFO[mint_address].get('symbol')
except Exception as e:
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
return None
@ -557,80 +301,6 @@ async def get_token_metadata(mint_address):
return None
async def get_wallet_balances(wallet_address, doGetTokenName=True):
balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}")
global TOKENS_INFO
try:
response = await solana_client.get_token_accounts_by_owner_json_parsed(
Pubkey.from_string(wallet_address),
opts=TokenAccountOpts(
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
),
commitment=Confirmed
)
if response.value:
for account in response.value:
try:
parsed_data = account.account.data.parsed
if isinstance(parsed_data, dict) and 'info' in parsed_data:
info = parsed_data['info']
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
mint = info['mint']
decimals = info['tokenAmount']['decimals']
amount = float(info['tokenAmount']['amount'])/10**decimals
if amount > 0:
if mint in TOKENS_INFO:
token_name = TOKENS_INFO[mint].get('symbol')
elif doGetTokenName:
token_name = await get_token_metadata_symbol(mint) or 'N/A'
# sleep for 1 second to avoid rate limiting
await asyncio.sleep(2)
TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals)
TOKENS_INFO[mint]['decimals'] = decimals
balances[mint] = {
'name': token_name or 'N/A',
'address': mint,
'amount': amount,
'decimals': decimals
}
# sleep for 1 second to avoid rate limiting
logging.debug(f"Account balance for {token_name} ({mint}): {amount}")
else:
logging.warning(f"Unexpected data format for account: {account}")
except Exception as e:
logging.error(f"Error parsing account data: {str(e)}")
sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
if sol_balance.value is not None:
balances['SOL'] = {
'name': 'SOL',
'address': 'SOL',
'amount': sol_balance.value / 1e9
}
else:
logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
except Exception as e:
logging.error(f"Error getting wallet balances: {str(e)}")
logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}")
return balances
async def convert_balances_to_currency(balances , sol_price):
converted_balances = {}
for address, info in balances.items():
converted_balance = info.copy() # Create a copy of the original info
if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price
elif address in TOKEN_PRICES:
converted_balance['value'] = info['amount'] * TOKEN_PRICES[address]
else:
converted_balance['value'] = None # Price not available
logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance
return converted_balances
async def get_swap_transaction_details(tx_signature_str):
@ -676,200 +346,9 @@ async def get_swap_transaction_details(tx_signature_str):
return None
# # # RAW Solana API RPC # # #
#this is the meat of the application
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO
try:
if readfromDump and os.path.exists('./logs/transation_details.json'):
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
transaction_details = json.load(f)
return transaction_details
else:
transaction_details = await solana_jsonrpc("getTransaction", tx_signature)
with open('./logs/transation_details.json', 'w') as f:
json.dump(transaction_details, f, indent=2)
if transaction_details is None:
logging.error(f"Error fetching transaction details for {tx_signature}")
return None
# Initialize default result structure
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract order_id from logs
log_messages = transaction_details.get("meta", {}).get("logMessages", [])
for log in log_messages:
if "order_id" in log:
parsed_result["order_id"] = log.split(":")[2].strip()
break
# Extract token transfers from innerInstructions
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
info = instruction['parsed']['info']
mint = info['mint']
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
# Determine which token is being swapped in and out based on zero balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
# if we've failed to extract token_in and token_out from the transaction details, try a second method
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
transfers = []
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
info = instruction['parsed']['info']
amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
adjusted_amount = amount / (10 ** decimals)
# adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0))
transfers.append({
'mint': info.get('mint'),
'amount': adjusted_amount,
'source': info['source'],
'destination': info['destination']
})
# Identify token_in and token_out
if len(transfers) >= 2:
parsed_result["token_in"] = transfers[0]['mint']
parsed_result["amount_in"] = transfers[0]['amount']
parsed_result["token_out"] = transfers[-1]['mint']
parsed_result["amount_out"] = transfers[-1]['amount']
# If mint is not provided, query the Solana network for the account data
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
#for transfer in transfers:
# do only first and last transfer
for transfer in [transfers[0], transfers[-1]]:
if transfer['mint'] is None:
# Query the Solana network for the account data
account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source'])
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_value = account_data_result['value']
account_data_data = account_data_value['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'mint' in account_data_info:
transfer['mint'] = account_data_info['mint']
if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
await get_token_metadata_symbol(transfer['mint'])
# get actual prices
current_price = await get_token_prices([transfer['mint']])
if parsed_result["token_in"] is None:
parsed_result["token_in"] = transfer['mint']
parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol']
parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = transfer['mint']
parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol']
parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price']
pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balalnces:
if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
break
# Calculate percentage swapped
try:
if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
else:
# calculate based on total wallet value: FOLLOWED_WALLET_VALUE
parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100
except Exception as e:
logging.error(f"Error calculating percentage swapped: {e}")
return parsed_result
except requests.exceptions.RequestException as e:
print("Error fetching transaction details:", e)
# # # # # # # # # # Functionality # # # # # # # # # #
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
global TOKENS_INFO # new
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) |
set(your_wallet_balances.keys()) |
set(TOKEN_ADDRESSES.values()))
TOKEN_PRICES = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
TOKEN_ADDRESSES = {
address: info for address,
info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})")
FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += info['value']
message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}"
)
logging.info(message)
await send_telegram_message(message)
# save token info to file
await save_token_info()
def safe_get_property(info, property_name, default='Unknown'):
if not isinstance(info, dict):
@ -887,7 +366,7 @@ async def get_transaction_details_with_retry(transaction_id, retry_delay = 8, ma
# qwery every 5 seconds for the transaction details untill not None or 30 seconds
for _ in range(max_retries):
try:
tx_details = await get_transaction_details_rpc(transaction_id)
tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id)
if tx_details is not None:
break
except Exception as e:
@ -1015,13 +494,13 @@ async def process_log(log_result):
f"Amount In USD: {tr_details['amount_in_USD']:.2f}\n"
f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%"
)
await send_telegram_message(message_text)
await telegram_utils.send_telegram_message(message_text)
await follow_move(tr_details)
await save_token_info()
except Exception as e:
logging.error(f"Error aquiring log details and following: {e}")
await send_telegram_message(f"Not followed! Error following move.")
await telegram_utils.send_telegram_message(f"Not followed! Error following move.")
@ -1072,14 +551,14 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
async def follow_move(move):
your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
your_balances = await solanaAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
if your_balance_info is not None:
# Use the balance
print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
else:
print("No ballance found for {move['symbol_in']}. Skipping move.")
await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
return
your_balance = your_balance_info['amount']
@ -1087,12 +566,12 @@ async def follow_move(move):
token_info = TOKENS_INFO.get(move['token_in'])
token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in'])
token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await get_token_metadata_symbol(move['token_out'])
token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out'])
if not your_balance:
msg = f"<b>Move not followed:</b>\nNo balance found for token {move['symbol_in']}. Cannot follow move."
logging.warning(msg)
await send_telegram_message(msg)
await telegram_utils.send_telegram_message(msg)
return
# move["percentage_swapped"] = (move["amount_out"] / move["amount_in"]) * 100
@ -1119,7 +598,7 @@ async def follow_move(move):
f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway."
)
logging.warning(msg)
await send_telegram_message(msg)
await telegram_utils.send_telegram_message(msg)
try:
try:
@ -1129,7 +608,7 @@ async def follow_move(move):
)
# logging.info(notification)
# error_logger.info(notification)
# await send_telegram_message(notification)
# await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
@ -1159,7 +638,7 @@ async def follow_move(move):
# append to notification
notification += f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
await send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
tx_details = await get_transaction_details_with_retry(transaction_id)
if tx_details is not None:
@ -1173,7 +652,7 @@ async def follow_move(move):
# log the errors to /logs/errors.log
error_logger.error(error_message)
error_logger.exception(e)
await send_telegram_message(error_message)
await telegram_utils.send_telegram_message(error_message)
amount = amount * 0.75
await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
@ -1198,7 +677,7 @@ async def follow_move(move):
f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
)
logging.info(notification)
await send_telegram_message(notification)
await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
@ -1210,9 +689,9 @@ async def follow_move(move):
error_logger.exception(e) \
# if error_message contains 'Program log: Error: insufficient funds'
if 'insufficient funds' in error_message:
await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
else:
await send_telegram_message(error_message)
await telegram_utils.send_telegram_message(error_message)
# Helper functions
@ -1236,41 +715,45 @@ async def check_PK():
if not pk:
logging.error("Private key not found in environment variables. Will not be able to sign transactions.")
# send TG warning message
await send_telegram_message("<b>Warning:</b> Private key not found in environment variables. Will not be able to sign transactions.")
await telegram_utils.send_telegram_message("<b>Warning:</b> Private key not found in environment variables. Will not be able to sign transactions.")
solanaAPI = SolanaAPI(process_transaction_callback=process_log)
async def main():
global bot, PROCESSING_LOG
global solanaAPI, bot, PROCESSING_LOG
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await check_PK()
# new: restart wallet_watch_loop every hour
while True:
wallet_watch_task = asyncio.create_task(wallet_watch_loop())
await solanaAPI.wallet_watch_loop()
try:
# Wait for an hour or until the task completes, whichever comes first
await asyncio.wait_for(wallet_watch_task, timeout=3600)
except asyncio.TimeoutError:
# If an hour has passed, cancel the task if not PROCESSING
if PROCESSING_LOG:
logging.info("wallet_watch_loop is processing logs. Will not restart.")
await send_telegram_message("wallet_watch_loop is processing logs. Will not restart.")
else:
wallet_watch_task.cancel()
try:
await wallet_watch_task
except asyncio.CancelledError:
logging.info("wallet_watch_loop was cancelled after running for an hour")
except Exception as e:
logging.error(f"Error in wallet_watch_loop: {str(e)}")
await send_telegram_message(f"Error in wallet_watch_loop: {str(e)}")
# while True:
# wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop())
logging.info("Restarting wallet_watch_loop")
await send_telegram_message("Restarting wallet_watch_loop")
# try:
# # Wait for an hour or until the task completes, whichever comes first
# await asyncio.wait_for(wallet_watch_task, timeout=3600)
# except asyncio.TimeoutError:
# # If an hour has passed, cancel the task if not PROCESSING
# if PROCESSING_LOG:
# logging.info("wallet_watch_loop is processing logs. Will not restart.")
# await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.")
# else:
# wallet_watch_task.cancel()
# try:
# await wallet_watch_task
# except asyncio.CancelledError:
# logging.info("wallet_watch_loop was cancelled after running for an hour")
# except Exception as e:
# logging.error(f"Error in wallet_watch_loop: {str(e)}")
# await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}")
# logging.info("Restarting wallet_watch_loop")
# await telegram_utils.send_telegram_message("Restarting wallet_watch_loop")

View File

@ -1,5 +1,7 @@
import sys
import os
import aiohttp
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
@ -7,23 +9,21 @@ import json
import logging
import random
import websockets
from typing import Optional
from typing import Dict, List, Optional
import requests
import datetime
from datetime import datetime
from solana.rpc.types import TokenAccountOpts, TxOpts
logger = logging.getLogger(__name__)
SOLANA_ENDPOINTS = [
"wss://api.mainnet-beta.solana.com",
# "wss://solana-api.projectserum.com",
# "wss://rpc.ankr.com/solana",
# "wss://mainnet.rpcpool.com",
]
PING_INTERVAL = 30
SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 1 minute
from config import (
FOLLOWED_WALLET, SOLANA_HTTP_URL
FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY
)
from modules.utils import telegram_utils
@ -106,6 +106,7 @@ class SolanaWS:
async def process_messages(self):
while True:
message = await self.message_queue.get()
if self.on_message:
await self.on_message(message)
logger.info(f"Received message: {message}")
@ -113,78 +114,52 @@ class SolanaWS:
if self.websocket:
await self.websocket.close()
logger.info("WebSocket connection closed")
while True:
message = await self.message_queue.get()
try:
response_data = json.loads(message)
if 'params' in response_data:
log = response_data['params']['result']
await process_log(log)
else:
logger.warning(f"Unexpected response: {response_data}")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred while processing message: {e}")
finally:
self.message_queue.task_done()
async def solana_jsonrpc(method, params=None, jsonParsed=True):
# target json example:
# data = {
# "jsonrpc": "2.0",
# "id": 1,
# "method": "getTransaction",
# "params": [
# tx_signature,
# {
# "encoding": "jsonParsed",
# "maxSupportedTransactionVersion": 0
# }
# ]
# }
# if param is not array, make it array
if not isinstance(params, list):
params = [params]
params = [params] if params is not None else []
data = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or []
"params": params
}
data["params"].append({"maxSupportedTransactionVersion": 0})
if jsonParsed:
data["params"][1]["encoding"] = "jsonParsed"
if jsonParsed:
data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0})
else:
data["params"].append({"maxSupportedTransactionVersion": 0})
try:
# url = 'https://solana.drpc.org'
response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
response.raise_for_status() # Raises an error for bad responses
response.raise_for_status()
result = response.json()
if not 'result' in result or 'error' in result:
print("Error fetching data from Solana RPC:", result)
if 'result' not in result or 'error' in result:
logger.error("Error fetching data from Solana RPC:", result)
return None
return result['result']
except Exception as e:
logging.error(f"Error fetching data from Solana RPC: {e}")
logger.error(f"Error fetching data from Solana RPC: {e}")
return None
class SolanaAPI:
def __init__(self, process_transaction_callback, on_initial_subscription_callback = None, on_bot_message=None):
self.process_transaction = process_transaction_callback
self.on_initial_subscription = on_initial_subscription_callback
self.on_bot_message = on_bot_message,
def __init__(self, process_log_callback, send_telegram_message_callback, list_initial_wallet_states_callback):
self.process_log = process_log_callback
self.list_initial_wallet_states = list_initial_wallet_states_callback
self.dex = SolanaDEX(DISPLAY_CURRENCY)
self.solana_ws = SolanaWS(on_message=self.process_transaction)
async def process_messages(self, solana_ws):
while True:
message = await solana_ws.message_queue.get()
await self.process_log(message)
await self.process_transaction(message)
async def wallet_watch_loop():
solana_ws = SolanaWS(on_message=process_log)
async def wallet_watch_loop(self):
solana_ws = SolanaWS(on_message=self.process_transaction)
first_subscription = True
while True:
@ -193,10 +168,10 @@ class SolanaAPI:
await solana_ws.subscribe()
if first_subscription:
asyncio.create_task(self.list_initial_wallet_states())
asyncio.create_task(self.on_initial_subscription())
first_subscription = False
await telegram_utils.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
await self.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
receive_task = asyncio.create_task(solana_ws.receive_messages())
process_task = asyncio.create_task(solana_ws.process_messages())
@ -214,20 +189,15 @@ class SolanaAPI:
finally:
await solana_ws.unsubscribe()
if solana_ws.websocket:
await solana_ws.websocket.close()
await telegram_utils.send_telegram_message("Reconnecting...")
await solana_ws.close()
await self.send_telegram_message("Reconnecting...")
await asyncio.sleep(5)
async def process_transaction(signature):
# Implement your logic to process each transaction
async def process_transaction(self, signature):
print(f"Processing transaction: {signature['signature']}")
# You can add more processing logic here, such as storing in a database,
# triggering notifications, etc.
# Example usage
# async def main():
# account_address = "Vote111111111111111111111111111111111111111"
# Add your transaction processing logic here
async def get_last_transactions(account_address, check_interval=300, limit=1000):
async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
last_check_time = None
last_signature = None
@ -252,17 +222,521 @@ class SolanaAPI:
if last_signature and signature['signature'] == last_signature:
break
# Process the transaction
await process_transaction(signature)
await self.process_transaction(signature)
if result:
last_signature = result[0]['signature']
last_check_time = current_time
await asyncio.sleep(1) # Sleep for 1 second before checking again
await asyncio.sleep(1)
async def get_token_metadata_symbol(mint_address):
global TOKENS_INFO
if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]:
return TOKENS_INFO[mint_address].get('symbol')
try:
account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address)
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_data = account_data_result['value']['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'decimals' in account_data_info:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
else:
TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
if 'tokenName' in account_data_info:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
else:
TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
metadata = await get_token_metadata(mint_address)
if metadata:
if mint_address in TOKENS_INFO:
TOKENS_INFO[mint_address].update(metadata)
else:
TOKENS_INFO[mint_address] = metadata
await save_token_info()
# TOKENS_INFO[mint_address] = metadata
# return metadata.get('symbol') or metadata.get('name')
return TOKENS_INFO[mint_address].get('symbol')
except Exception as e:
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
return None
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO
try:
if readfromDump and os.path.exists('./logs/transation_details.json'):
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
transaction_details = json.load(f)
return transaction_details
else:
transaction_details = await solana_jsonrpc("getTransaction", tx_signature)
with open('./logs/transation_details.json', 'w') as f:
json.dump(transaction_details, f, indent=2)
if transaction_details is None:
logging.error(f"Error fetching transaction details for {tx_signature}")
return None
# Initialize default result structure
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract order_id from logs
log_messages = transaction_details.get("meta", {}).get("logMessages", [])
for log in log_messages:
if "order_id" in log:
parsed_result["order_id"] = log.split(":")[2].strip()
break
# Extract token transfers from innerInstructions
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
info = instruction['parsed']['info']
mint = info['mint']
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
# Determine which token is being swapped in and out based on zero balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
# if we've failed to extract token_in and token_out from the transaction details, try a second method
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
transfers = []
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
info = instruction['parsed']['info']
amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
adjusted_amount = amount / (10 ** decimals)
# adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0))
transfers.append({
'mint': info.get('mint'),
'amount': adjusted_amount,
'source': info['source'],
'destination': info['destination']
})
# Identify token_in and token_out
if len(transfers) >= 2:
parsed_result["token_in"] = transfers[0]['mint']
parsed_result["amount_in"] = transfers[0]['amount']
parsed_result["token_out"] = transfers[-1]['mint']
parsed_result["amount_out"] = transfers[-1]['amount']
# If mint is not provided, query the Solana network for the account data
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
#for transfer in transfers:
# do only first and last transfer
for transfer in [transfers[0], transfers[-1]]:
if transfer['mint'] is None:
# Query the Solana network for the account data
account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source'])
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_value = account_data_result['value']
account_data_data = account_data_value['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'mint' in account_data_info:
transfer['mint'] = account_data_info['mint']
if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
await get_token_metadata_symbol(transfer['mint'])
# get actual prices
current_price = await get_token_prices([transfer['mint']])
if parsed_result["token_in"] is None:
parsed_result["token_in"] = transfer['mint']
parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol']
parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = transfer['mint']
parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol']
parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price']
pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balalnces:
if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
break
# Calculate percentage swapped
try:
if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
else:
# calculate based on total wallet value: FOLLOWED_WALLET_VALUE
parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100
except Exception as e:
logging.error(f"Error calculating percentage swapped: {e}")
return parsed_result
except requests.exceptions.RequestException as e:
print("Error fetching transaction details:", e)
if __name__ == "__main__":
asyncio.run(wallet_watch_loop())
class SolanaDEX:
def __init__(self, DISPLAY_CURRENCY):
self.DISPLAY_CURRENCY = DISPLAY_CURRENCY
pass
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
global TOKENS_INFO
# Skip for USD
prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
remaining_tokens = [addr for addr in token_addresses if addr not in prices]
# Try CoinGecko
coingecko_prices = await get_prices_from_coingecko(remaining_tokens)
prices.update(coingecko_prices)
# For remaining missing tokens, try Jupiter
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
jupiter_prices = await get_prices_from_jupiter(list(missing_tokens))
prices.update(jupiter_prices)
# For tokens not found in CoinGecko, use DexScreener
missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
if missing_tokens:
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
prices.update(dexscreener_prices)
# For remaining missing tokens, try Raydium
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
raydium_prices = await get_prices_from_raydium(list(missing_tokens))
prices.update(raydium_prices)
# For remaining missing tokens, try Orca
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
orca_prices = await get_prices_from_orca(list(missing_tokens))
prices.update(orca_prices)
# If any tokens are still missing, set their prices to 0
for token in set(token_addresses) - set(prices.keys()):
prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.")
for token, price in prices.items():
token_info = TOKENS_INFO.setdefault(token, {})
if 'symbol' not in token_info:
token_info['symbol'] = await get_token_metadata_symbol(token)
token_info['price'] = price
return prices
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
prices = {}
async def fetch_single_price(session, address):
params = {
"contract_addresses": address,
"vs_currencies": DISPLAY_CURRENCY.lower()
}
try:
async with session.get(base_url, params=params) as response:
if response.status == 200:
data = await response.json()
if address in data and DISPLAY_CURRENCY.lower() in data[address]:
return address, data[address][DISPLAY_CURRENCY.lower()]
else:
logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}")
return address, None
async with aiohttp.ClientSession() as session:
tasks = [fetch_single_price(session, address) for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, price in results:
if price is not None:
prices[address] = price
return prices
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {}
try:
async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
except Exception as e:
logging.error(f"Error fetching token prices from DexScreener: {str(e)}")
return prices
async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]:
url = "https://price.jup.ag/v4/price"
params = {
"ids": ",".join(token_addresses)
}
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.get('data', {}).items():
if 'price' in price_info:
prices[address] = float(price_info['price'])
else:
logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
return prices
# New function for Raydium
async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.raydium.io/v2/main/price"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for address in token_addresses:
if address in data:
prices[address] = float(data[address])
else:
logging.error(f"Failed to get token prices from Raydium. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Raydium: {str(e)}")
return prices
# New function for Orca
async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.orca.so/allTokens"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for token_info in data:
if token_info['mint'] in token_addresses:
prices[token_info['mint']] = float(token_info['price'])
else:
logging.error(f"Failed to get token prices from Orca. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Orca: {str(e)}")
return prices
async def fetch_token_data(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching data from {url}: {str(e)}")
return None
async def get_sol_price() -> float:
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
return await get_token_prices([sol_address]).get(sol_address, 0.0)
async def get_wallet_balances(wallet_address, doGetTokenName=True):
balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}")
global TOKENS_INFO
try:
response = await solana_client.get_token_accounts_by_owner_json_parsed(
Pubkey.from_string(wallet_address),
opts=TokenAccountOpts(
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
),
commitment=Confirmed
)
if response.value:
for account in response.value:
try:
parsed_data = account.account.data.parsed
if isinstance(parsed_data, dict) and 'info' in parsed_data:
info = parsed_data['info']
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
mint = info['mint']
decimals = info['tokenAmount']['decimals']
amount = float(info['tokenAmount']['amount'])/10**decimals
if amount > 0:
if mint in TOKENS_INFO:
token_name = TOKENS_INFO[mint].get('symbol')
elif doGetTokenName:
token_name = await get_token_metadata_symbol(mint) or 'N/A'
# sleep for 1 second to avoid rate limiting
await asyncio.sleep(2)
TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals)
TOKENS_INFO[mint]['decimals'] = decimals
balances[mint] = {
'name': token_name or 'N/A',
'address': mint,
'amount': amount,
'decimals': decimals
}
# sleep for 1 second to avoid rate limiting
logging.debug(f"Account balance for {token_name} ({mint}): {amount}")
else:
logging.warning(f"Unexpected data format for account: {account}")
except Exception as e:
logging.error(f"Error parsing account data: {str(e)}")
sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
if sol_balance.value is not None:
balances['SOL'] = {
'name': 'SOL',
'address': 'SOL',
'amount': sol_balance.value / 1e9
}
else:
logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
except Exception as e:
logging.error(f"Error getting wallet balances: {str(e)}")
logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}")
return balances
async def convert_balances_to_currency(balances , sol_price):
converted_balances = {}
for address, info in balances.items():
converted_balance = info.copy() # Create a copy of the original info
if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price
elif address in TOKEN_PRICES:
converted_balance['value'] = info['amount'] * TOKEN_PRICES[address]
else:
converted_balance['value'] = None # Price not available
logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance
return converted_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
global TOKENS_INFO # new
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) |
set(your_wallet_balances.keys()) |
set(TOKEN_ADDRESSES.values()))
TOKEN_PRICES = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
TOKEN_ADDRESSES = {
address: info for address,
info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})")
FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += info['value']
message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}"
)
logging.info(message)
await telegram_utils.send_telegram_message(message)
# save token info to file
await save_token_info()
#example
# async def main():
# await telegram_utils.initialize()
# async def process_log(log):
# print(f"Processing log: {log}")
# async def list_initial_wallet_states():
# print("Listing initial wallet states")
# wallet_watch_task = asyncio.create_task(solana_api.wallet_watch_loop())
# try:
# await asyncio.gather(wallet_watch_task)
# except asyncio.CancelledError:
# pass
# finally:
# await telegram_utils.close()
# if __name__ == "__main__":
# asyncio.run(main())

View File

@ -28,7 +28,7 @@ class TelegramUtils:
await self.initialize()
try:
await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
# await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
logging.info(f"Telegram message sent: {message}")
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")