From 774bd333efdd4418f27cb52df985f9a150c0b744 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 6 Oct 2024 01:05:50 +0300 Subject: [PATCH] tick --- crypto/sol/app.py | 153 +++++++++++++++++++++++++--------------------- 1 file changed, 84 insertions(+), 69 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 59d008c..36f7643 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -30,6 +30,7 @@ from typing import List, Dict import requests import threading import re +from typing import List, Dict, Any, Tuple load_dotenv() app = Flask(__name__) @@ -208,20 +209,6 @@ async def get_sol_price() -> float: logging.error(f"Failed to get SOL price. Status: {response.status}") return None -async def convert_balances_to_currency(balances, token_prices, sol_price): - converted_balances = {} - for address, info in balances.items(): - converted_balance = info.copy() # Create a copy of the original info - if info['name'] == 'SOL': - converted_balance['value'] = info['amount'] * sol_price - elif address in token_prices: - converted_balance['value'] = info['amount'] * token_prices[address] - else: - converted_balance['value'] = None # Price not available - logging.warning(f"Price not available for token {info['name']} ({address})") - converted_balances[address] = converted_balance - return converted_balances - async def get_token_balance_rpc(wallet_address, token_address): url = SOLANA_HTTP_URL @@ -332,6 +319,20 @@ async def get_wallet_balances(wallet_address): return balances +async def convert_balances_to_currency(balances, token_prices, sol_price): + converted_balances = {} + for address, info in balances.items(): + converted_balance = info.copy() # Create a copy of the original info + if info['name'] == 'SOL': + converted_balance['value'] = info['amount'] * sol_price + elif address in token_prices: + converted_balance['value'] = info['amount'] * token_prices[address] + else: + converted_balance['value'] = None # Price not available + logging.warning(f"Price not available for token {info['name']} ({address})") + converted_balances[address] = converted_balance + return converted_balances + async def list_initial_wallet_states(): global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE @@ -345,22 +346,22 @@ async def list_initial_wallet_states(): followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price) your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price) - TOKEN_ADDRESSES = {token: balance['amount'] for token, balance in {**followed_converted_balances, **your_converted_balances}.items() if balance['amount'] is not None and balance['amount'] > 0} - logging.info(f"Monitoring balances for tokens: {[balance['name'] for balance in TOKEN_ADDRESSES.values()]}") + TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0} + logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") followed_wallet_state = [] FOLLOWED_WALLET_VALUE = 0 - for token, balance in followed_converted_balances.items(): - if balance['amount'] is not None and balance['amount'] > 0: - followed_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}") - FOLLOWED_WALLET_VALUE += balance['amount'] + for address, info in followed_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + followed_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}") + FOLLOWED_WALLET_VALUE += info['value'] your_wallet_state = [] YOUR_WALLET_VALUE = 0 - for token, balance in your_converted_balances.items(): - if balance['amount'] is not None and balance['amount'] > 0: - your_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}") - YOUR_WALLET_VALUE += balance['amount'] + for address, info in your_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + your_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}") + YOUR_WALLET_VALUE += info['value'] message = ( f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" @@ -371,12 +372,13 @@ async def list_initial_wallet_states(): f"{chr(10).join(your_wallet_state)}\n" f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"Monitored Tokens:\n" - f"{', '.join([balance['name'] for balance in TOKEN_ADDRESSES.values()])}" + f"{', '.join([info['name'] for info in TOKEN_ADDRESSES.values()])}" ) logging.info(message) await send_telegram_message(message) + async def get_transaction_details_rpc(tx_signature, readfromDump=False): url = SOLANA_HTTP_URL # url = 'https://solana.drpc.org' @@ -407,7 +409,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False): if 'result' in transaction_details: - print(transaction_details['result']) + #print(transaction_details['result']) return transaction_details['result'] else: print("Unexpected response:", transaction_details) @@ -432,7 +434,7 @@ async def process_log(log_result): if log_result['value']['err']: return - tx_signature_str = log_result['value']['signature'] + logs = log_result['value']['logs'] try: # Detect swap operations in logs @@ -441,7 +443,8 @@ async def process_log(log_result): for log_entry in logs: if any(op in log_entry for op in swap_operations): try: - details = await parse_swap_logs(logs) + tx_signature_str = log_result['value']['signature'] + details = await parse_swap_logs(tx_signature_str, logs) message_text = ( f"Swap detected:\n" @@ -471,49 +474,25 @@ async def process_log(log_result): # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", -async def parse_swap_logs(logs): +async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: token_in = None token_out = None amount_in = 0 - amount_out_expected = 0 - amount_out_actual = 0 + amount_out = 0 order_id = None - before_source_balance = 0 - for log in logs: - if "Program log:" in log: - if "order_id:" in log: - order_id = log.split("order_id: ")[-1].strip() - elif "Swap2" in log: - token_in = None - token_out = None - elif not token_in: - token_in = log.split("Program log: ")[-1].strip() - elif not token_out: - token_out = log.split("Program log: ")[-1].strip() - - if "before_source_balance:" in log: - before_source_balance = int(re.search(r"before_source_balance: (\d+)", log).group(1)) - - if "amount_in" in log or "amount_out" in log: - amount_matches = re.findall(r"(amount_in|amount_out): (\d+)", log) - for amount_type, value in amount_matches: - if amount_type == "amount_in": - amount_in = int(value) - elif amount_type == "amount_out": - amount_out_expected = int(value) - - elif "source_token_change:" in log or "destination_token_change:" in log: - changes = log.split(", ") - for change in changes: - if "source_token_change" in change: - amount_in = int(change.split(": ")[-1]) - elif "destination_token_change" in change: - amount_out_actual = int(change.split(": ")[-1]) + transaction_details = await get_transaction_details_rpc(tx_signature_str) + token_in, token_out, amount_in, amount_out = _extract_token_info(transaction_details) + # Fetch token prices token_prices = await get_token_prices([token_in, token_out]) - amount_in_usd = amount_in / 1e6 * token_prices.get(token_in, 0) - amount_out_usd = amount_out_actual / 1e6 * token_prices.get(token_out, 0) + + # Calculate USD values + amount_in_usd = amount_in * token_prices.get(token_in, 0) + amount_out_usd = amount_out * token_prices.get(token_out, 0) + + # Get the pre-balance of the input token + before_source_balance = _get_pre_balance(transaction_details, token_in) # Calculate the percentage of the source balance that was swapped percentage_swapped = (amount_in / before_source_balance) * 100 if before_source_balance > 0 else 0 @@ -522,14 +501,48 @@ async def parse_swap_logs(logs): "order_id": order_id, "token_in": token_in, "token_out": token_out, - "amount_in": amount_in / 1e6, - "amount_out_expected": amount_out_expected / 1e6, - "amount_out_actual": amount_out_actual / 1e6, + "amount_in": amount_in, + "amount_out": amount_out, "amount_in_USD": amount_in_usd, "amount_out_USD": amount_out_usd, "percentage_swapped": percentage_swapped } +def _extract_token_info(transaction_details: Dict[str, Any]) -> Tuple[str, str, float, float]: + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + token_in = None + token_out = None + amount_in = 0 + amount_out = 0 + + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if 'parsed' in instruction and 'info' in instruction['parsed']: + info = instruction['parsed']['info'] + if info.get('type') == 'transfer': + if token_in is None: + token_in = info.get('source') + amount_in = float(info.get('amount', 0)) + else: + token_out = info.get('destination') + amount_out = float(info.get('amount', 0)) + + return token_in, token_out, amount_in, amount_out + +def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: + pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) + for balance in pre_balances: + if balance['mint'] == token: + return float(balance['uiTokenAmount']['amount']) + return 0.0 +def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: + pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) + for balance in pre_balances: + if balance['mint'] == token: + return float(balance['uiTokenAmount']['amount']) + return 0.0 + + async def follow_move(move): your_balances = await get_wallet_balances(YOUR_WALLET) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) @@ -609,6 +622,9 @@ async def subscribe_to_wallet(): reconnect_delay = 5 # Start with a 5-second delay max_reconnect_delay = 60 # Maximum delay of 60 seconds + + await list_initial_wallet_states() + while True: try: async with websockets.connect(uri) as websocket: @@ -644,7 +660,6 @@ async def subscribe_to_wallet(): await save_subscription_id(subscription_id) logger.info(f"Subscription successful. Subscription id: {subscription_id}") await send_telegram_message("Connected to Solana network. Watching for transactions now.") - await list_initial_wallet_states() elif 'params' in response_data: await on_logs(response_data['params']['result']) @@ -679,7 +694,7 @@ async def main(): # Initialize logging logging.basicConfig(level=logging.DEBUG) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") - await subscribe_to_wallet() + #await subscribe_to_wallet() def run_flask(): # Run Flask app without the reloader, so we can run the async main function