diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 36f7643..8f39c2d 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -7,7 +7,10 @@ from solana.transaction import Signature from solana.rpc.websocket_api import connect from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.commitment import Confirmed -from base58 import b58decode +from solana.transaction import Transaction +from base64 import b64decode +import base58 +from solders.rpc.requests import GetTransaction from solders.signature import Signature from solders.pubkey import Pubkey from solders.keypair import Keypair @@ -16,6 +19,7 @@ from solders.transaction import Transaction from solders.message import Message from solders.instruction import Instruction from solders.hash import Hash +from solders.instruction import CompiledInstruction from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient from telegram import Bot @@ -378,6 +382,76 @@ async def list_initial_wallet_states(): logging.info(message) await send_telegram_message(message) +async def get_swap_transaction_details(tx_signature_str): + t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) + try: + transaction = t.value.transaction + message = transaction.transaction.message + instructions = message.instructions + + accounts = t.value.transaction.transaction.message.instructions[0].accounts + instructions = t.value.transaction.transaction.message.instructions + + # Assume the swap is the first instruction + swap_instruction = instructions[0] + + # Extract accounts involved in the swap instruction + accounts = swap_instruction.accounts + + # Initialize result dictionary + parsed_result = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + + # Extract log messages for order_id and swap details. we have that in the log hook + # log_messages = t.value.meta.log_messages + # for log in log_messages: + # if "order_id" in log: + # parsed_result["order_id"] = log.split(":")[1].strip() + # break + + # Parse the swap instruction to extract token addresses, amounts, and types + for instruction in instructions: + if isinstance(instruction, CompiledInstruction): + if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"): + parsed_info = instruction.parsed.info + mint = parsed_info["mint"] + amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"]) + + # Determine token in and token out based on balances + if parsed_result["token_in"] is None and amount > 0: + parsed_result["token_in"] = mint + parsed_result["amount_in"] = amount + elif parsed_result["token_out"] is None: + parsed_result["token_out"] = mint + parsed_result["amount_out"] = amount + + # Calculate USD values if token is USDC + if parsed_result["token_in"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v": + parsed_result["amount_in_USD"] = parsed_result["amount_in"] + if parsed_result["token_out"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v": + parsed_result["amount_out_USD"] = parsed_result["amount_out"] + + # Calculate percentage swapped + if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: + parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 + + return parsed_result + + except Exception as e: + logging.error(f"Error fetching transaction details: {e}") + + return None + + + async def get_transaction_details_rpc(tx_signature, readfromDump=False): url = SOLANA_HTTP_URL @@ -407,12 +481,59 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False): with open('./logs/transation_details.json', 'w') as f: json.dump(transaction_details, f, indent=2) - if 'result' in transaction_details: - #print(transaction_details['result']) - return transaction_details['result'] + result = transaction_details['result'] + + # Initialize default result structure + parsed_result = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + + # Extract order_id from logs + log_messages = result.get("meta", {}).get("logMessages", []) + for log in log_messages: + if "order_id" in log: + parsed_result["order_id"] = log.split(":")[1].strip() + break + + # Extract token transfers from innerInstructions + inner_instructions = result.get('meta', {}).get('innerInstructions', []) + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': + info = instruction['parsed']['info'] + mint = info['mint'] + amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals + + # Determine which token is being swapped in and out based on zero balances + if parsed_result["token_in"] is None and amount > 0: + parsed_result["token_in"] = mint + parsed_result["amount_in"] = amount + elif parsed_result["token_out"] is None: + parsed_result["token_out"] = mint + parsed_result["amount_out"] = amount + + # Calculate USD values if token is USDC + if parsed_result["token_in"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v": + parsed_result["amount_in_USD"] = parsed_result["amount_in"] + if parsed_result["token_out"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v": + parsed_result["amount_out_USD"] = parsed_result["amount_out"] + + # Calculate percentage swapped + if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: + parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 + + return parsed_result else: print("Unexpected response:", transaction_details) + return None except requests.exceptions.RequestException as e: print("Error fetching transaction details:", e) @@ -474,6 +595,7 @@ async def process_log(log_result): # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", + async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: token_in = None token_out = None @@ -481,6 +603,11 @@ async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, A amount_out = 0 order_id = None + try: + token_in, token_out, amount_in, amount_out = await get_swap_transaction_details(tx_signature_str) + except Exception as e: + logging.error(f"Error fetching swap transaction details: {e}") + transaction_details = await get_transaction_details_rpc(tx_signature_str) token_in, token_out, amount_in, amount_out = _extract_token_info(transaction_details) @@ -508,26 +635,7 @@ async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, A "percentage_swapped": percentage_swapped } -def _extract_token_info(transaction_details: Dict[str, Any]) -> Tuple[str, str, float, float]: - inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) - token_in = None - token_out = None - amount_in = 0 - amount_out = 0 - for instruction_set in inner_instructions: - for instruction in instruction_set.get('instructions', []): - if 'parsed' in instruction and 'info' in instruction['parsed']: - info = instruction['parsed']['info'] - if info.get('type') == 'transfer': - if token_in is None: - token_in = info.get('source') - amount_in = float(info.get('amount', 0)) - else: - token_out = info.get('destination') - amount_out = float(info.get('amount', 0)) - - return token_in, token_out, amount_in, amount_out def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) @@ -694,7 +802,7 @@ async def main(): # Initialize logging logging.basicConfig(level=logging.DEBUG) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") - #await subscribe_to_wallet() + await subscribe_to_wallet() def run_flask(): # Run Flask app without the reloader, so we can run the async main function