From 2a15df144cb2160839eb23d720af8679d7c74653 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 5 Oct 2024 23:00:20 +0300 Subject: [PATCH] also get token names --- crypto/sol/app.py | 157 +++++++++++++++++++++++++--------------------- 1 file changed, 84 insertions(+), 73 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index f8e8966..62a4e82 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -34,6 +34,19 @@ import re load_dotenv() app = Flask(__name__) +ENV_FILE = '.env' + +async def save_subscription_id(subscription_id): + # storing subscription id in .env file disabled + #set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id)) + logger.info(f"Saved subscription ID: {subscription_id}") + +async def load_subscription_id(): + subscription_id = os.getenv("SUBSCRIPTION_ID") + return int(subscription_id) if subscription_id else None + + + # Function to find the latest log file def get_latest_log_file(): log_dir = './logs' @@ -103,35 +116,6 @@ async def send_telegram_message(message): logging.error(f"Error sending Telegram message: {str(e)}") -# async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: -# url = "https://api.coingecko.com/api/v3/simple/token_price/solana" -# params = { -# "contract_addresses": ",".join(token_addresses), -# "vs_currencies": DISPLAY_CURRENCY.lower() -# } -# prices = {} - -# async with aiohttp.ClientSession() as session: -# async with session.get(url, params=params) as response: -# if response.status == 200: -# data = await response.json() -# for address, price_info in data.items(): -# if DISPLAY_CURRENCY.lower() in price_info: -# prices[address] = price_info[DISPLAY_CURRENCY.lower()] -# else: -# logging.error(f"Failed to get token prices. Status: {response.status}") - -# # For tokens not found in CoinGecko, try to get price from a DEX or set a default value -# missing_tokens = set(token_addresses) - set(prices.keys()) -# for token in missing_tokens: -# # You might want to implement a fallback method here, such as: -# # prices[token] = await get_price_from_dex(token) -# # For now, we'll set a default value -# prices[token] = 0.0 -# logging.warning(f"Price not found for token {token}. Setting to 0.") - -# return prices - async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: coingecko_prices = await get_prices_from_coingecko(token_addresses) @@ -237,33 +221,6 @@ async def convert_balances_to_currency(balances, token_prices, sol_price): return converted_balances - -async def get_token_balance(wallet_address, token_address): - try: - response = await solana_client.get_token_accounts_by_owner_json_parsed( - Pubkey.from_string(wallet_address), - opts=TokenAccountOpts( - mint=Pubkey.from_string(token_address), - # program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") - ), - commitment=Confirmed - ) - - if response['result']['value']: - balance = await solana_client.get_token_account_balance( - response['result']['value'][0]['pubkey'] - ) - amount = float(balance['result']['value']['uiAmount']) - logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") - return amount - else: - logging.debug(f"No account found for {token_address} in {wallet_address}") - return 0 - except Exception as e: - logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") - return 0 - - async def get_token_balance_rpc(wallet_address, token_address): url = SOLANA_HTTP_URL headers = {"Content-Type": "application/json"} @@ -313,16 +270,8 @@ async def get_token_balance_rpc(wallet_address, token_address): except requests.exceptions.RequestException as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 -ENV_FILE = '.env' + -async def save_subscription_id(subscription_id): - # storing subscription id in .env file disabled - #set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id)) - logger.info(f"Saved subscription ID: {subscription_id}") - -async def load_subscription_id(): - subscription_id = os.getenv("SUBSCRIPTION_ID") - return int(subscription_id) if subscription_id else None async def get_token_name(mint_address): @@ -356,15 +305,23 @@ async def get_wallet_balances(wallet_address): mint = info['mint'] amount = float(info['tokenAmount']['uiAmount']) if amount > 0: - token_name = await get_token_name(mint) or mint - balances[f"{token_name} ({mint})"] = amount + token_name = await get_token_name(mint) or 'Unknown' + balances[mint] = { + 'name': token_name, + 'address': mint, + 'amount': amount + } logging.debug(f"Balance for {token_name} ({mint}): {amount}") else: logging.warning(f"Unexpected data format for account: {account}") sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: - balances['SOL'] = sol_balance.value / 1e9 + balances['SOL'] = { + 'name': 'SOL', + 'address': 'SOL', + 'amount': sol_balance.value / 1e9 + } else: logging.warning(f"SOL balance response missing for wallet: {wallet_address}") @@ -386,6 +343,49 @@ async def list_initial_wallet_states(): followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price) your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price) + TOKEN_ADDRESSES = {token: balance['amount'] for token, balance in {**followed_converted_balances, **your_converted_balances}.items() if balance['amount'] is not None and balance['amount'] > 0} + logging.info(f"Monitoring balances for tokens: {[balance['name'] for balance in TOKEN_ADDRESSES.values()]}") + + followed_wallet_state = [] + FOLLOWED_WALLET_VALUE = 0 + for token, balance in followed_converted_balances.items(): + if balance['amount'] is not None and balance['amount'] > 0: + followed_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}") + FOLLOWED_WALLET_VALUE += balance['amount'] + + your_wallet_state = [] + YOUR_WALLET_VALUE = 0 + for token, balance in your_converted_balances.items(): + if balance['amount'] is not None and balance['amount'] > 0: + your_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}") + YOUR_WALLET_VALUE += balance['amount'] + + message = ( + f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" + f"Followed Wallet ({FOLLOWED_WALLET}):\n" + f"{chr(10).join(followed_wallet_state)}\n" + f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Your Wallet ({YOUR_WALLET}):\n" + f"{chr(10).join(your_wallet_state)}\n" + f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Monitored Tokens:\n" + f"{', '.join([balance['name'] for balance in TOKEN_ADDRESSES.values()])}" + ) + + logging.info(message) + await send_telegram_message(message) + global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE + + followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) + your_wallet_balances = await get_wallet_balances(YOUR_WALLET) + + all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys())) + token_prices = await get_token_prices(all_token_addresses) + sol_price = await get_sol_price() + + followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price) + your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price) + TOKEN_ADDRESSES = {token: amount for token, amount in {**followed_converted_balances, **your_converted_balances}.items() if amount is not None and amount > 0} logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}") @@ -574,7 +574,16 @@ async def parse_swap_logs(logs): async def follow_move(move): your_balances = await get_wallet_balances(YOUR_WALLET) - your_balance = your_balances.get(move['token_in'], 0) + your_balance_info = your_balances.get(move['token_in']) + + if not your_balance_info: + message = f"Move Failed:\nNo balance found for token {move['token_in']}" + logging.warning(message) + await send_telegram_message(message) + return + + your_balance = your_balance_info['amount'] + token_name = your_balance_info['name'] # Calculate the amount to swap based on the same percentage as the followed move amount_to_swap = your_balance * (move['percentage_swapped'] / 100) @@ -599,11 +608,14 @@ async def follow_move(move): result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) transaction_id = json.loads(result.to_json())['result'] + output_token_info = your_balances.get(move['token_out'], {'name': 'Unknown'}) + output_token_name = output_token_info['name'] + message = ( f"Move Followed:\n" - f"Swapped {amount_to_swap:.6f} {move['token_in']} " + f"Swapped {amount_to_swap:.6f} {token_name} ({move['token_in']}) " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" - f"for {transaction_data['outputAmount'] / 1e6:.6f} {move['token_out']}" + f"for {transaction_data['outputAmount'] / 1e6:.6f} {output_token_name} ({move['token_out']})" ) logging.info(message) await send_telegram_message(message) @@ -614,11 +626,10 @@ async def follow_move(move): else: message = ( f"Move Failed:\n" - f"Insufficient balance to swap {amount_to_swap:.6f} {move['token_in']}" + f"Insufficient balance to swap {amount_to_swap:.6f} {token_name} ({move['token_in']})" ) logging.warning(message) await send_telegram_message(message) - # Helper functions (implement these according to your needs)