succesfully parsing swaps !

follow move has all the needed parameters now.
This commit is contained in:
Dobromir Popov 2024-10-07 12:56:32 +03:00
parent a6f43b8a9a
commit a6ca2e3886

View File

@ -6,7 +6,7 @@ from solana.rpc.async_api import AsyncClient
from solana.transaction import Signature
from solana.rpc.websocket_api import connect
from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.commitment import Confirmed
from solana.rpc.commitment import Confirmed, Processed
from solana.transaction import Transaction
from base64 import b64decode
import base58
@ -20,6 +20,7 @@ from solders.message import Message
from solders.instruction import Instruction
from solders.hash import Hash
from solders.instruction import CompiledInstruction
from solders import message
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
from dexscreener import DexscreenerClient
from telegram import Bot
@ -77,12 +78,13 @@ def retry_last_log():
with open(latest_log_file, 'r') as f:
log = json.load(f)
# await process_log(log)
# Run the asynchronous process_log function
asyncio.run(process_log(log))
return jsonify({"status": "Log processed successfully"}), 200
return jsonify({"status": "Log dump processed successfully"}), 200
except Exception as e:
logging.error(f"Error processing log: {e}")
logging.error(f"Error processing log dump: {e}")
return jsonify({"error": "Failed to process log"}), 500
@ -202,18 +204,6 @@ async def get_sol_price_from_dexscreener() -> float:
prices = await get_prices_from_dexscreener([sol_address])
return prices.get(sol_address, 0.0)
async def get_sol_price() -> float:
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['solana'][DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get SOL price. Status: {response.status}")
return None
async def get_token_balance_rpc(wallet_address, token_address):
url = SOLANA_HTTP_URL
headers = {"Content-Type": "application/json"}
@ -324,32 +314,33 @@ async def get_wallet_balances(wallet_address):
return balances
async def convert_balances_to_currency(balances, token_prices, sol_price):
async def convert_balances_to_currency(balances , sol_price):
converted_balances = {}
for address, info in balances.items():
converted_balance = info.copy() # Create a copy of the original info
if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price
elif address in token_prices:
converted_balance['value'] = info['amount'] * token_prices[address]
elif address in TOKEN_PRICES:
converted_balance['value'] = info['amount'] * TOKEN_PRICES[address]
else:
converted_balance['value'] = None # Price not available
logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance
return converted_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()))
token_prices = await get_token_prices(all_token_addresses)
TOKEN_PRICES = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price)
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
@ -386,20 +377,6 @@ async def list_initial_wallet_states():
async def get_swap_transaction_details(tx_signature_str):
t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0)
try:
transaction = t.value.transaction
message = transaction.transaction.message
instructions = message.instructions
accounts = t.value.transaction.transaction.message.instructions[0].accounts
instructions = t.value.transaction.transaction.message.instructions
# Assume the swap is the first instruction
swap_instruction = instructions[0]
# Extract accounts involved in the swap instruction
accounts = swap_instruction.accounts
# Initialize result dictionary
parsed_result = {
"order_id": None,
"token_in": None,
@ -418,6 +395,7 @@ async def get_swap_transaction_details(tx_signature_str):
# parsed_result["order_id"] = log.split(":")[1].strip()
# break
instructions = t.value.transaction.transaction.message.instructions
# Parse the swap instruction to extract token addresses, amounts, and types
for instruction in instructions:
if isinstance(instruction, CompiledInstruction):
@ -455,84 +433,157 @@ async def get_swap_transaction_details(tx_signature_str):
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
url = SOLANA_HTTP_URL
# url = 'https://solana.drpc.org'
headers = {"Content-Type": "application/json"}
data = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTransaction",
"params": [
tx_signature,
{
"encoding": "jsonParsed",
"maxSupportedTransactionVersion": 0
}
]
}
try:
if readfromDump and os.path.exists('./logs/transation_details.json'):
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
transaction_details = json.load(f)
return transaction_details
else:
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status() # Raises an error for bad responses
transaction_details = response.json()
transaction_details = await solana_jsonrpc("getTransaction", [tx_signature])
with open('./logs/transation_details.json', 'w') as f:
json.dump(transaction_details, f, indent=2)
if 'result' in transaction_details:
result = transaction_details['result']
# Initialize default result structure
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Initialize default result structure
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract order_id from logs
log_messages = transaction_details.get("meta", {}).get("logMessages", [])
for log in log_messages:
if "order_id" in log:
parsed_result["order_id"] = log.split(":")[2].strip()
break
# Extract order_id from logs
log_messages = result.get("meta", {}).get("logMessages", [])
for log in log_messages:
if "order_id" in log:
parsed_result["order_id"] = log.split(":")[2].strip()
break
# Extract token transfers from innerInstructions
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
info = instruction['parsed']['info']
mint = info['mint']
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
# Determine which token is being swapped in and out based on zero balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
# if we've failed to extract token_in and token_out from the transaction details, try a second method
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
transfers = []
# Extract token transfers from innerInstructions
inner_instructions = result.get('meta', {}).get('innerInstructions', [])
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
info = instruction['parsed']['info']
mint = info['mint']
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
adjusted_amount = amount / (10 ** decimals)
transfers.append({
'mint': info.get('mint'),
'amount': adjusted_amount,
'source': info['source'],
'destination': info['destination']
})
# Determine which token is being swapped in and out based on zero balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
# Identify token_in and token_out
if len(transfers) >= 2:
parsed_result["token_in"] = transfers[0]['mint']
parsed_result["amount_in"] = transfers[0]['amount']
parsed_result["token_out"] = transfers[-1]['mint']
parsed_result["amount_out"] = transfers[-1]['amount']
# If mint is not provided, query the Solana network for the account data
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
#for transfer in transfers:
# do only first and last transfer
for transfer in [transfers[0], transfers[-1]]:
if transfer['mint'] is None:
# Query the Solana network for the account data
account_data_result = await solana_jsonrpc("getAccountInfo", [transfer['source']])
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_value = account_data_result['value']
account_data_data = account_data_value['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'mint' in account_data_info:
transfer['mint'] = account_data_info['mint']
if parsed_result["token_in"] is None:
parsed_result["token_in"] = transfer['mint']
parsed_result["amount_in"] = transfer['amount']/10**6
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = transfer['mint']
parsed_result["amount_out"] = transfer['amount']/10**6
pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balalnces:
if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
break
# Calculate USD values if token is USDC
# Calculate percentage swapped
if parsed_result["amount_in"] > 0 and parsed_result["before_source_balance"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
# Calculate percentage swapped
if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100
return parsed_result
return parsed_result
else:
print("Unexpected response:", transaction_details)
return None
except requests.exceptions.RequestException as e:
print("Error fetching transaction details:", e)
async def solana_jsonrpc(method, params = None, jsonParsed = True):
# target json example:
# data = {
# "jsonrpc": "2.0",
# "id": 1,
# "method": "getTransaction",
# "params": [
# tx_signature,
# {
# "encoding": "jsonParsed",
# "maxSupportedTransactionVersion": 0
# }
# ]
# }
data = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or []
}
data["params"].append({"maxSupportedTransactionVersion": 0})
if jsonParsed:
data["params"][1]["encoding"] = "jsonParsed"
try:
# url = 'https://solana.drpc.org'
response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
response.raise_for_status() # Raises an error for bad responses
result = response.json()
if not 'result' in result or 'error' in result:
print("Error fetching data from Solana RPC:", result)
return None
return result['result']
except Exception as e:
logging.error(f"Error fetching data from Solana RPC: {e}")
return None
async def save_log(log):
try:
os.makedirs('./logs', exist_ok=True)
@ -612,23 +663,23 @@ async def process_log(log_result):
source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6
if tr_details["order_id"] is None or tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0:
# GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS
if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0:
logging.warning("Incomplete swap details found in logs. Getting details from transaction")
details = await get_transaction_details_info(tx_signature_str, logs)
tr_details = await get_transaction_details_info(tx_signature_str, logs)
# onlt needed if no details got
if before_source_balance > 0 and source_token_change > 0:
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
if tr_details["order_id"] is not None :
message_text = (
f"Swap detected:\n"
f"Order ID: {tr_details['order_id']}\n"
f"Token In: {tr_details['token_in']}\n"
f"Token Out: {tr_details['token_out']}\n"
f"Amount In USD: {tr_details['amount_in_USD']}\n"
f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%"
)
message_text = (
f"Swap detected:\n"
f"Order ID: {tr_details['order_id']}\n"
f"Token In: {tr_details['token_in']}\n"
f"Token Out: {tr_details['token_out']}\n"
f"Amount In USD: {tr_details['amount_in_USD']}\n"
f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%"
)
await send_telegram_message(message_text)
await follow_move(tr_details)
@ -647,52 +698,25 @@ async def process_log(log_result):
# "Program log: source_token_change: 58730110139, destination_token_change: 270131294",
async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
token_in = None
token_out = None
amount_in = 0
amount_out = 0
order_id = None
try:
token_in, token_out, amount_in, amount_out = await get_swap_transaction_details(tx_signature_str)
tr_info = await get_swap_transaction_details(tx_signature_str)
except Exception as e:
logging.error(f"Error fetching swap transaction details: {e}")
transaction_details = await get_transaction_details_rpc(tx_signature_str)
token_in, token_out, amount_in, amount_out = _extract_token_info(transaction_details)
tr_info = await get_transaction_details_rpc(tx_signature_str)
# Fetch token prices
token_prices = await get_token_prices([token_in, token_out])
token_prices = await get_token_prices([tr_info['token_in'], tr_info['token_out']])
# Calculate USD values
amount_in_usd = amount_in * token_prices.get(token_in, 0)
amount_out_usd = amount_out * token_prices.get(token_out, 0)
# Get the pre-balance of the input token
before_source_balance = _get_pre_balance(transaction_details, token_in)
tr_info['amount_in_usd'] = tr_info['amount_in'] * token_prices.get(tr_info['token_in'], 0)
tr_info['amount_out_usd'] = tr_info['amount_out'] * token_prices.get(tr_info['token_out'], 0)
# Calculate the percentage of the source balance that was swapped
percentage_swapped = (amount_in / before_source_balance) * 100 if before_source_balance > 0 else 0
tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50
return tr_info
return {
"order_id": order_id,
"token_in": token_in,
"token_out": token_out,
"amount_in": amount_in,
"amount_out": amount_out,
"amount_in_USD": amount_in_usd,
"amount_out_USD": amount_out_usd,
"percentage_swapped": percentage_swapped
}
def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balances:
if balance['mint'] == token:
return float(balance['uiTokenAmount']['amount'])
return 0.0
def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balances:
@ -706,9 +730,9 @@ async def follow_move(move):
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
if not your_balance_info:
message = f"<b>Move Failed:</b>\nNo balance found for token {move['token_in']}"
logging.warning(message)
await send_telegram_message(message)
msg = f"<b>Move Failed:</b>\nNo balance found for token {move['token_in']}"
logging.warning(msg)
await send_telegram_message(msg)
return
your_balance = your_balance_info['amount']
@ -732,17 +756,45 @@ async def follow_move(move):
)
instructions = [transaction_data['instruction']] # Adjust as needed
message = Message(instructions, private_key.pubkey())
blockhash = await async_client.get_latest_blockhash()
tx = Transaction([private_key], message, blockhash.value)
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message))
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
transaction_id = json.loads(result.to_json())['result']
print(f"Transaction sent: https://explorer.solana.com/tx/{transaction_id}")
result = await async_client.send_transaction(tx, private_key)
transaction_id = result.value
# # transaction_data is already a string, no need to json.loads()
# raw_transaction = base64.b64decode(transaction_data)
# # Send the raw transaction
# opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
# result = await async_client.send_raw_transaction(txn=raw_transaction, opts=opts)
# transaction_id = result.value # This should be the transaction signature
# Fetch the transaction details to get the output amount
tx_details = await async_client.get_transaction(transaction_id)
output_amount = tx_details.value.meta.post_balances[1] - tx_details.value.meta.pre_balances[1]
output_token_info = your_balances.get(move['token_out'], {'name': 'Unknown'})
output_token_name = output_token_info['name']
# transaction_data = json.loads(transaction_data)
# instructions = [transaction_data['instruction']] # Adjust as needed
# message = Message(instructions, private_key.pubkey())
# blockhash = await async_client.get_latest_blockhash()
# tx = Transaction([private_key], message, blockhash.value)
# result = await async_client.send_transaction(tx, private_key)
# transaction_id = result.value
# output_token_info = your_balances.get(move['token_out'], {'name': 'Unknown'})
# output_token_name = output_token_info['name']
# raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
# tx_message = raw_transaction.message
# signature = private_key.sign_message(tx_message.to_bytes_versioned())
@ -764,16 +816,16 @@ async def follow_move(move):
await send_telegram_message(notification)
except Exception as e:
error_message = f"<b>Swap Error:</b>\n{str(e)}"
error_message = f"<b>Swap Follow Error:</b>\n{str(e)}"
logging.error(error_message)
await send_telegram_message(error_message)
# await send_telegram_message(error_message)
else:
message = (
f"<b>Move Failed:</b>\n"
msg = (
f"<b>Move Not Followed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.6f} {token_name} ({move['token_in']})"
)
logging.warning(message)
await send_telegram_message(message)
logging.warning(msg)
await send_telegram_message(msg)
# Helper functions (implement these according to your needs)
@ -868,7 +920,7 @@ async def main():
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await subscribe_to_wallet()
# await subscribe_to_wallet()
def run_flask():
# Run Flask app without the reloader, so we can run the async main function