diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 1cc2400..25739d7 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -6,7 +6,7 @@ from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect from solana.rpc.types import TokenAccountOpts, TxOpts -from solana.rpc.commitment import Confirmed +from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from base64 import b64decode import base58 @@ -20,6 +20,7 @@ from solders.message import Message from solders.instruction import Instruction from solders.hash import Hash from solders.instruction import CompiledInstruction +from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient from telegram import Bot @@ -77,12 +78,13 @@ def retry_last_log(): with open(latest_log_file, 'r') as f: log = json.load(f) + # await process_log(log) # Run the asynchronous process_log function asyncio.run(process_log(log)) - return jsonify({"status": "Log processed successfully"}), 200 + return jsonify({"status": "Log dump processed successfully"}), 200 except Exception as e: - logging.error(f"Error processing log: {e}") + logging.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 @@ -202,18 +204,6 @@ async def get_sol_price_from_dexscreener() -> float: prices = await get_prices_from_dexscreener([sol_address]) return prices.get(sol_address, 0.0) -async def get_sol_price() -> float: - url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - return data['solana'][DISPLAY_CURRENCY.lower()] - else: - logging.error(f"Failed to get SOL price. Status: {response.status}") - return None - - async def get_token_balance_rpc(wallet_address, token_address): url = SOLANA_HTTP_URL headers = {"Content-Type": "application/json"} @@ -324,32 +314,33 @@ async def get_wallet_balances(wallet_address): return balances -async def convert_balances_to_currency(balances, token_prices, sol_price): +async def convert_balances_to_currency(balances , sol_price): converted_balances = {} for address, info in balances.items(): converted_balance = info.copy() # Create a copy of the original info if info['name'] == 'SOL': converted_balance['value'] = info['amount'] * sol_price - elif address in token_prices: - converted_balance['value'] = info['amount'] * token_prices[address] + elif address in TOKEN_PRICES: + converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] else: converted_balance['value'] = None # Price not available logging.warning(f"Price not available for token {info['name']} ({address})") converted_balances[address] = converted_balance return converted_balances + async def list_initial_wallet_states(): - global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE + global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances = await get_wallet_balances(YOUR_WALLET) all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys())) - token_prices = await get_token_prices(all_token_addresses) + TOKEN_PRICES = await get_token_prices(all_token_addresses) sol_price = await get_sol_price() - followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price) - your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price) + followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) + your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0} logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") @@ -386,20 +377,6 @@ async def list_initial_wallet_states(): async def get_swap_transaction_details(tx_signature_str): t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) try: - transaction = t.value.transaction - message = transaction.transaction.message - instructions = message.instructions - - accounts = t.value.transaction.transaction.message.instructions[0].accounts - instructions = t.value.transaction.transaction.message.instructions - - # Assume the swap is the first instruction - swap_instruction = instructions[0] - - # Extract accounts involved in the swap instruction - accounts = swap_instruction.accounts - - # Initialize result dictionary parsed_result = { "order_id": None, "token_in": None, @@ -418,6 +395,7 @@ async def get_swap_transaction_details(tx_signature_str): # parsed_result["order_id"] = log.split(":")[1].strip() # break + instructions = t.value.transaction.transaction.message.instructions # Parse the swap instruction to extract token addresses, amounts, and types for instruction in instructions: if isinstance(instruction, CompiledInstruction): @@ -455,83 +433,156 @@ async def get_swap_transaction_details(tx_signature_str): async def get_transaction_details_rpc(tx_signature, readfromDump=False): - url = SOLANA_HTTP_URL - # url = 'https://solana.drpc.org' - headers = {"Content-Type": "application/json"} - data = { - "jsonrpc": "2.0", - "id": 1, - "method": "getTransaction", - "params": [ - tx_signature, - { - "encoding": "jsonParsed", - "maxSupportedTransactionVersion": 0 - } - ] - } + try: if readfromDump and os.path.exists('./logs/transation_details.json'): with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details transaction_details = json.load(f) return transaction_details else: - response = requests.post(url, headers=headers, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - transaction_details = response.json() + transaction_details = await solana_jsonrpc("getTransaction", [tx_signature]) with open('./logs/transation_details.json', 'w') as f: json.dump(transaction_details, f, indent=2) - if 'result' in transaction_details: - result = transaction_details['result'] + # Initialize default result structure + parsed_result = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } - # Initialize default result structure - parsed_result = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } + # Extract order_id from logs + log_messages = transaction_details.get("meta", {}).get("logMessages", []) + for log in log_messages: + if "order_id" in log: + parsed_result["order_id"] = log.split(":")[2].strip() + break - # Extract order_id from logs - log_messages = result.get("meta", {}).get("logMessages", []) - for log in log_messages: - if "order_id" in log: - parsed_result["order_id"] = log.split(":")[2].strip() - break + # Extract token transfers from innerInstructions + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': + info = instruction['parsed']['info'] + mint = info['mint'] + amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals + + # Determine which token is being swapped in and out based on zero balances + if parsed_result["token_in"] is None and amount > 0: + parsed_result["token_in"] = mint + parsed_result["amount_in"] = amount + + + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + # if we've failed to extract token_in and token_out from the transaction details, try a second method + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + transfers = [] - # Extract token transfers from innerInstructions - inner_instructions = result.get('meta', {}).get('innerInstructions', []) for instruction_set in inner_instructions: for instruction in instruction_set.get('instructions', []): - if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: info = instruction['parsed']['info'] - mint = info['mint'] - amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals - - # Determine which token is being swapped in and out based on zero balances - if parsed_result["token_in"] is None and amount > 0: - parsed_result["token_in"] = mint - parsed_result["amount_in"] = amount + amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) + decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 + adjusted_amount = amount / (10 ** decimals) + transfers.append({ + 'mint': info.get('mint'), + 'amount': adjusted_amount, + 'source': info['source'], + 'destination': info['destination'] + }) + + # Identify token_in and token_out + if len(transfers) >= 2: + parsed_result["token_in"] = transfers[0]['mint'] + parsed_result["amount_in"] = transfers[0]['amount'] + parsed_result["token_out"] = transfers[-1]['mint'] + parsed_result["amount_out"] = transfers[-1]['amount'] + # If mint is not provided, query the Solana network for the account data + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + #for transfer in transfers: + # do only first and last transfer + for transfer in [transfers[0], transfers[-1]]: + if transfer['mint'] is None: + # Query the Solana network for the account data + account_data_result = await solana_jsonrpc("getAccountInfo", [transfer['source']]) - # Calculate USD values if token is USDC + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_value = account_data_result['value'] + account_data_data = account_data_value['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'mint' in account_data_info: + transfer['mint'] = account_data_info['mint'] + if parsed_result["token_in"] is None: + parsed_result["token_in"] = transfer['mint'] + parsed_result["amount_in"] = transfer['amount']/10**6 + elif parsed_result["token_out"] is None: + parsed_result["token_out"] = transfer['mint'] + parsed_result["amount_out"] = transfer['amount']/10**6 - # Calculate percentage swapped - if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: - parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 + pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) + for balance in pre_balalnces: + if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: + parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] + break + + + # Calculate percentage swapped + if parsed_result["amount_in"] > 0 and parsed_result["before_source_balance"] > 0: + parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 + + return parsed_result - return parsed_result - else: - print("Unexpected response:", transaction_details) - return None except requests.exceptions.RequestException as e: print("Error fetching transaction details:", e) + +async def solana_jsonrpc(method, params = None, jsonParsed = True): + # target json example: + # data = { + # "jsonrpc": "2.0", + # "id": 1, + # "method": "getTransaction", + # "params": [ + # tx_signature, + # { + # "encoding": "jsonParsed", + # "maxSupportedTransactionVersion": 0 + # } + # ] + # } + + data = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params or [] + } + data["params"].append({"maxSupportedTransactionVersion": 0}) + if jsonParsed: + data["params"][1]["encoding"] = "jsonParsed" + + + try: + # url = 'https://solana.drpc.org' + response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) + response.raise_for_status() # Raises an error for bad responses + result = response.json() + if not 'result' in result or 'error' in result: + print("Error fetching data from Solana RPC:", result) + return None + return result['result'] + except Exception as e: + logging.error(f"Error fetching data from Solana RPC: {e}") + return None + async def save_log(log): try: @@ -612,23 +663,23 @@ async def process_log(log_result): source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6 - - if tr_details["order_id"] is None or tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: + # GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS + if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") - details = await get_transaction_details_info(tx_signature_str, logs) - + tr_details = await get_transaction_details_info(tx_signature_str, logs) + # onlt needed if no details got if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 - if tr_details["order_id"] is not None : - message_text = ( - f"Swap detected:\n" - f"Order ID: {tr_details['order_id']}\n" - f"Token In: {tr_details['token_in']}\n" - f"Token Out: {tr_details['token_out']}\n" - f"Amount In USD: {tr_details['amount_in_USD']}\n" - f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%" - ) + + message_text = ( + f"Swap detected:\n" + f"Order ID: {tr_details['order_id']}\n" + f"Token In: {tr_details['token_in']}\n" + f"Token Out: {tr_details['token_out']}\n" + f"Amount In USD: {tr_details['amount_in_USD']}\n" + f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%" + ) await send_telegram_message(message_text) await follow_move(tr_details) @@ -647,52 +698,25 @@ async def process_log(log_result): # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: - token_in = None - token_out = None - amount_in = 0 - amount_out = 0 - order_id = None - + try: - token_in, token_out, amount_in, amount_out = await get_swap_transaction_details(tx_signature_str) + tr_info = await get_swap_transaction_details(tx_signature_str) except Exception as e: logging.error(f"Error fetching swap transaction details: {e}") - transaction_details = await get_transaction_details_rpc(tx_signature_str) - token_in, token_out, amount_in, amount_out = _extract_token_info(transaction_details) + tr_info = await get_transaction_details_rpc(tx_signature_str) # Fetch token prices - token_prices = await get_token_prices([token_in, token_out]) + token_prices = await get_token_prices([tr_info['token_in'], tr_info['token_out']]) # Calculate USD values - amount_in_usd = amount_in * token_prices.get(token_in, 0) - amount_out_usd = amount_out * token_prices.get(token_out, 0) - - # Get the pre-balance of the input token - before_source_balance = _get_pre_balance(transaction_details, token_in) + tr_info['amount_in_usd'] = tr_info['amount_in'] * token_prices.get(tr_info['token_in'], 0) + tr_info['amount_out_usd'] = tr_info['amount_out'] * token_prices.get(tr_info['token_out'], 0) # Calculate the percentage of the source balance that was swapped - percentage_swapped = (amount_in / before_source_balance) * 100 if before_source_balance > 0 else 0 + tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50 + return tr_info - return { - "order_id": order_id, - "token_in": token_in, - "token_out": token_out, - "amount_in": amount_in, - "amount_out": amount_out, - "amount_in_USD": amount_in_usd, - "amount_out_USD": amount_out_usd, - "percentage_swapped": percentage_swapped - } - - - -def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: - pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) - for balance in pre_balances: - if balance['mint'] == token: - return float(balance['uiTokenAmount']['amount']) - return 0.0 def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) for balance in pre_balances: @@ -706,9 +730,9 @@ async def follow_move(move): your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) if not your_balance_info: - message = f"Move Failed:\nNo balance found for token {move['token_in']}" - logging.warning(message) - await send_telegram_message(message) + msg = f"Move Failed:\nNo balance found for token {move['token_in']}" + logging.warning(msg) + await send_telegram_message(msg) return your_balance = your_balance_info['amount'] @@ -732,17 +756,45 @@ async def follow_move(move): ) - instructions = [transaction_data['instruction']] # Adjust as needed - message = Message(instructions, private_key.pubkey()) - blockhash = await async_client.get_latest_blockhash() - tx = Transaction([private_key], message, blockhash.value) + raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) + signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) + signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) + opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) + result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) + transaction_id = json.loads(result.to_json())['result'] + print(f"Transaction sent: https://explorer.solana.com/tx/{transaction_id}") - result = await async_client.send_transaction(tx, private_key) - transaction_id = result.value + + + # # transaction_data is already a string, no need to json.loads() + # raw_transaction = base64.b64decode(transaction_data) + # # Send the raw transaction + # opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) + # result = await async_client.send_raw_transaction(txn=raw_transaction, opts=opts) + # transaction_id = result.value # This should be the transaction signature + + # Fetch the transaction details to get the output amount + tx_details = await async_client.get_transaction(transaction_id) + output_amount = tx_details.value.meta.post_balances[1] - tx_details.value.meta.pre_balances[1] output_token_info = your_balances.get(move['token_out'], {'name': 'Unknown'}) output_token_name = output_token_info['name'] + + + # transaction_data = json.loads(transaction_data) + + # instructions = [transaction_data['instruction']] # Adjust as needed + # message = Message(instructions, private_key.pubkey()) + # blockhash = await async_client.get_latest_blockhash() + # tx = Transaction([private_key], message, blockhash.value) + + # result = await async_client.send_transaction(tx, private_key) + # transaction_id = result.value + + # output_token_info = your_balances.get(move['token_out'], {'name': 'Unknown'}) + # output_token_name = output_token_info['name'] + # raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) # tx_message = raw_transaction.message # signature = private_key.sign_message(tx_message.to_bytes_versioned()) @@ -764,16 +816,16 @@ async def follow_move(move): await send_telegram_message(notification) except Exception as e: - error_message = f"Swap Error:\n{str(e)}" + error_message = f"Swap Follow Error:\n{str(e)}" logging.error(error_message) - await send_telegram_message(error_message) + # await send_telegram_message(error_message) else: - message = ( - f"Move Failed:\n" + msg = ( + f"Move Not Followed:\n" f"Insufficient balance to swap {amount_to_swap:.6f} {token_name} ({move['token_in']})" ) - logging.warning(message) - await send_telegram_message(message) + logging.warning(msg) + await send_telegram_message(msg) # Helper functions (implement these according to your needs) @@ -868,7 +920,7 @@ async def main(): # Initialize logging logging.basicConfig(level=logging.DEBUG) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") - await subscribe_to_wallet() + # await subscribe_to_wallet() def run_flask(): # Run Flask app without the reloader, so we can run the async main function