From b5bc6505a270d5aeff4d00f4b45efe6e98e92932 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 5 Oct 2024 20:36:25 +0300 Subject: [PATCH] transaction parse progress --- crypto/sol/app.py | 126 +++++++++++++++++++++++++++++----------------- 1 file changed, 80 insertions(+), 46 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index f5ca76f..c8cfb1f 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -253,7 +253,8 @@ async def get_token_balance(wallet_address, token_address): ENV_FILE = '.env' async def save_subscription_id(subscription_id): - set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id)) + # storing subscription id in .env file disabled + #set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id)) logger.info(f"Saved subscription ID: {subscription_id}") async def load_subscription_id(): @@ -486,7 +487,7 @@ from solders.pubkey import Pubkey from solders.transaction import Transaction from solders.signature import Signature -async def get_transaction_details_rpc(tx_signature): +async def get_transaction_details_rpc(tx_signature, readfromDump=False): url = SOLANA_HTTP_URL # url = 'https://solana.drpc.org' headers = {"Content-Type": "application/json"} @@ -503,9 +504,17 @@ async def get_transaction_details_rpc(tx_signature): ] } try: - response = requests.post(url, headers=headers, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - transaction_details = response.json() + if readfromDump and os.path.exists('./logs/transation_details.json'): + with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details + transaction_details = json.load(f) + return transaction_details + else: + response = requests.post(url, headers=headers, data=json.dumps(data)) + response.raise_for_status() # Raises an error for bad responses + transaction_details = response.json() + with open('./logs/transation_details.json', 'w') as f: + json.dump(transaction_details, f, indent=2) + if 'result' in transaction_details: print(transaction_details['result']) @@ -584,33 +593,67 @@ async def process_log(log_result): # -- OLD using solana-py watched_tokens = await get_non_zero_token_balances(FOLLOWED_WALLET) - details = parse_swap_logs(logs) - transaction = await get_transaction_details_rpc(tx_signature_str) + details = parse_swap_logs(logs, watched_tokens) + transaction = await get_transaction_details_rpc(tx_signature_str, True) - instructions = transaction['transaction']['message']['instructions'] + # instructions = transaction['transaction']['message']['instructions'] - for instruction in instructions: - from_pubkey, to_pubkey, amount_in, amount_out = extract_swap_details(instruction, logs, watched_tokens) + # for instruction in instructions: + # from_pubkey, to_pubkey, amount_in, amount_out = extract_swap_details(instruction, logs, watched_tokens) - if from_pubkey in watched_tokens.values() or to_pubkey in watched_tokens.values(): - from_token = determine_token(from_pubkey, watched_tokens) - to_token = determine_token(to_pubkey, watched_tokens) + # if from_pubkey in watched_tokens.values() or to_pubkey in watched_tokens.values(): + # from_token = determine_token(from_pubkey, watched_tokens) + # to_token = determine_token(to_pubkey, watched_tokens) - move = { - 'token': from_token, - 'amount': amount_in, - 'to_token': to_token - } - message_text = ( - f"Swap detected:\n" - f"From: {from_pubkey} ({from_token})\n" - f"To: {to_pubkey} ({to_token})\n" - f"Amount In: {amount_in}\n" - f"Amount Out: {amount_out}" - ) - await send_telegram_message(message_text) - await follow_move(move) - + # move = { + # 'token': from_token, + # 'amount': amount_in, + # 'to_token': to_token + # } + # message_text = ( + # f"Swap detected:\n" + # f"From: {from_pubkey} ({from_token})\n" + # f"To: {to_pubkey} ({to_token})\n" + # f"Amount In: {amount_in}\n" + # f"Amount Out: {amount_out}" + # ) + # await send_telegram_message(message_text) + # await follow_move(move) + + tokens = [] + + # Check inner instructions for transfers and mints + for instruction_set in transaction.get('meta', {}).get('innerInstructions', []): + for instruction in instruction_set.get('instructions', []): + if 'parsed' in instruction and 'info' in instruction['parsed']: + info = instruction['parsed']['info'] + if 'amount' in info: + amount = info['amount'] + # Assume mint is available for mintTo + mint = info.get('mint', 'Unknown') + tokens.append({'amount': amount, 'mint': mint}) + + # Check post token balances for final token states + for balance in transaction.get('postTokenBalances', []): + amount = balance['uiTokenAmount']['amount'] + mint = balance['mint'] + tokens.append({'amount': amount, 'mint': mint}) + + # get amount_in, amount_out and token in and token out and USD value + swap_details = { + 'amount_in': details['total_amount_in'], + 'amount_out': details['total_amount_out'], + 'tokens': tokens + } + + message_text = ( + f"Swap detected:\n" + f"Amount In: {swap_details['amount_in']}\n" + f"Amount Out: {swap_details['amount_out']}" + ) + + await send_telegram_message(message_text) + # await follow_move(move) except Exception as e: logging.error(f"Error fetching transaction details: {e}") @@ -619,12 +662,10 @@ async def process_log(log_result): except Exception as e: logging.error(f"Error processing log: {e}") - -def parse_swap_logs(logs): +def parse_swap_logs(logs, watched_tokens): total_amount_in = 0 total_amount_out = 0 - source_token_address = "" - destination_token_address = "" + token_addresses = [] for log in logs: if "SwapEvent" in log: @@ -639,26 +680,19 @@ def parse_swap_logs(logs): total_amount_out += int(event_details.get("amount_out", 0)) if "source_token_change:" in log: - # Extract final source and destination token addresses + # Extract final source and destination token changes changes = log.split(", ") - for change in changes.Trim('Program log:'): - key, value = change.split(": ") + for change in changes: + key_value = change.split(": ", 1) + if len(key_value) != 2: + continue + key, value = key_value if key == "source_token_change": total_amount_in = int(value) elif key == "destination_token_change": total_amount_out = int(value) - if "Program log:" in log and len(log.split()) == 2: - # Extract token addresses - token_address = log.split(": ")[1] - if not source_token_address: - source_token_address = token_address - elif not destination_token_address: - destination_token_address = token_address - return { - "source_token_address": source_token_address, - "destination_token_address": destination_token_address, "total_amount_in": total_amount_in, "total_amount_out": total_amount_out, } @@ -750,7 +784,7 @@ async def main(): # Initialize logging logging.basicConfig(level=logging.DEBUG) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") - await subscribe_to_wallet() + # await subscribe_to_wallet() def run_flask(): # Run Flask app without the reloader, so we can run the async main function