proceed rtying to parse solana tr logs

This commit is contained in:
Dobromir Popov 2024-10-07 00:16:09 +03:00
parent 774bd333ef
commit 14f3674da0

View File

@ -7,7 +7,10 @@ from solana.transaction import Signature
from solana.rpc.websocket_api import connect from solana.rpc.websocket_api import connect
from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.commitment import Confirmed from solana.rpc.commitment import Confirmed
from base58 import b58decode from solana.transaction import Transaction
from base64 import b64decode
import base58
from solders.rpc.requests import GetTransaction
from solders.signature import Signature from solders.signature import Signature
from solders.pubkey import Pubkey from solders.pubkey import Pubkey
from solders.keypair import Keypair from solders.keypair import Keypair
@ -16,6 +19,7 @@ from solders.transaction import Transaction
from solders.message import Message from solders.message import Message
from solders.instruction import Instruction from solders.instruction import Instruction
from solders.hash import Hash from solders.hash import Hash
from solders.instruction import CompiledInstruction
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
from dexscreener import DexscreenerClient from dexscreener import DexscreenerClient
from telegram import Bot from telegram import Bot
@ -378,6 +382,76 @@ async def list_initial_wallet_states():
logging.info(message) logging.info(message)
await send_telegram_message(message) await send_telegram_message(message)
async def get_swap_transaction_details(tx_signature_str):
t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0)
try:
transaction = t.value.transaction
message = transaction.transaction.message
instructions = message.instructions
accounts = t.value.transaction.transaction.message.instructions[0].accounts
instructions = t.value.transaction.transaction.message.instructions
# Assume the swap is the first instruction
swap_instruction = instructions[0]
# Extract accounts involved in the swap instruction
accounts = swap_instruction.accounts
# Initialize result dictionary
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract log messages for order_id and swap details. we have that in the log hook
# log_messages = t.value.meta.log_messages
# for log in log_messages:
# if "order_id" in log:
# parsed_result["order_id"] = log.split(":")[1].strip()
# break
# Parse the swap instruction to extract token addresses, amounts, and types
for instruction in instructions:
if isinstance(instruction, CompiledInstruction):
if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"):
parsed_info = instruction.parsed.info
mint = parsed_info["mint"]
amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"])
# Determine token in and token out based on balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = mint
parsed_result["amount_out"] = amount
# Calculate USD values if token is USDC
if parsed_result["token_in"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":
parsed_result["amount_in_USD"] = parsed_result["amount_in"]
if parsed_result["token_out"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":
parsed_result["amount_out_USD"] = parsed_result["amount_out"]
# Calculate percentage swapped
if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100
return parsed_result
except Exception as e:
logging.error(f"Error fetching transaction details: {e}")
return None
async def get_transaction_details_rpc(tx_signature, readfromDump=False): async def get_transaction_details_rpc(tx_signature, readfromDump=False):
url = SOLANA_HTTP_URL url = SOLANA_HTTP_URL
@ -407,12 +481,59 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False):
with open('./logs/transation_details.json', 'w') as f: with open('./logs/transation_details.json', 'w') as f:
json.dump(transaction_details, f, indent=2) json.dump(transaction_details, f, indent=2)
if 'result' in transaction_details: if 'result' in transaction_details:
#print(transaction_details['result']) result = transaction_details['result']
return transaction_details['result']
# Initialize default result structure
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract order_id from logs
log_messages = result.get("meta", {}).get("logMessages", [])
for log in log_messages:
if "order_id" in log:
parsed_result["order_id"] = log.split(":")[1].strip()
break
# Extract token transfers from innerInstructions
inner_instructions = result.get('meta', {}).get('innerInstructions', [])
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
info = instruction['parsed']['info']
mint = info['mint']
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
# Determine which token is being swapped in and out based on zero balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = mint
parsed_result["amount_out"] = amount
# Calculate USD values if token is USDC
if parsed_result["token_in"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":
parsed_result["amount_in_USD"] = parsed_result["amount_in"]
if parsed_result["token_out"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":
parsed_result["amount_out_USD"] = parsed_result["amount_out"]
# Calculate percentage swapped
if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100
return parsed_result
else: else:
print("Unexpected response:", transaction_details) print("Unexpected response:", transaction_details)
return None
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
print("Error fetching transaction details:", e) print("Error fetching transaction details:", e)
@ -474,6 +595,7 @@ async def process_log(log_result):
# "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410",
# "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: after_source_balance: 0, after_destination_balance: 472509072",
# "Program log: source_token_change: 58730110139, destination_token_change: 270131294", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294",
async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
token_in = None token_in = None
token_out = None token_out = None
@ -481,6 +603,11 @@ async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, A
amount_out = 0 amount_out = 0
order_id = None order_id = None
try:
token_in, token_out, amount_in, amount_out = await get_swap_transaction_details(tx_signature_str)
except Exception as e:
logging.error(f"Error fetching swap transaction details: {e}")
transaction_details = await get_transaction_details_rpc(tx_signature_str) transaction_details = await get_transaction_details_rpc(tx_signature_str)
token_in, token_out, amount_in, amount_out = _extract_token_info(transaction_details) token_in, token_out, amount_in, amount_out = _extract_token_info(transaction_details)
@ -508,26 +635,7 @@ async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, A
"percentage_swapped": percentage_swapped "percentage_swapped": percentage_swapped
} }
def _extract_token_info(transaction_details: Dict[str, Any]) -> Tuple[str, str, float, float]:
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
token_in = None
token_out = None
amount_in = 0
amount_out = 0
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if 'parsed' in instruction and 'info' in instruction['parsed']:
info = instruction['parsed']['info']
if info.get('type') == 'transfer':
if token_in is None:
token_in = info.get('source')
amount_in = float(info.get('amount', 0))
else:
token_out = info.get('destination')
amount_out = float(info.get('amount', 0))
return token_in, token_out, amount_in, amount_out
def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', [])
@ -694,7 +802,7 @@ async def main():
# Initialize logging # Initialize logging
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
#await subscribe_to_wallet() await subscribe_to_wallet()
def run_flask(): def run_flask():
# Run Flask app without the reloader, so we can run the async main function # Run Flask app without the reloader, so we can run the async main function