import base64 import struct import sys import os import aiohttp sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Get the directory where the current script is located script_dir = os.path.dirname(os.path.abspath(__file__)) root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from spl.token.client import Token from base64 import b64decode import base58 from threading import Thread from solana.rpc.async_api import AsyncClient from solana.rpc.types import TxOpts from solana.rpc.commitment import Confirmed, Finalized, Processed from solders.rpc.requests import GetTransaction from solders.signature import Signature from solders.pubkey import Pubkey from solders.keypair import Keypair from solders.transaction import VersionedTransaction from solders.transaction import Transaction from solders.message import Message from solders.instruction import Instruction from solders.hash import Hash from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter import asyncio import json import logging import random import websockets from typing import Dict, List, Optional import requests from datetime import datetime from solana.rpc.types import TokenAccountOpts, TxOpts from typing import List, Dict, Any, Tuple import traceback import httpx # # # solders/solana libs (solana_client) # # # from spl.token._layouts import MINT_LAYOUT from solana.rpc.api import Client, Pubkey from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from borsh_construct import String, CStruct # ------------------ logger = logging.getLogger(__name__) PING_INTERVAL = 30 SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute from config import (FOLLOW_AMOUNT, FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET, SOLANA_WS_URL) from modules.utils import telegram_utils, async_safe_call, get_pk # Use the production Solana RPC endpoint solana_client = AsyncClient(SOLANA_HTTP_URL) dexscreener_client = DexscreenerClient() class SolanaWS: def __init__(self, on_message: Optional[callable] = None): self.websocket = None self.subscription_id = None self.message_queue = asyncio.Queue() self.on_message = on_message self.websocket = None self.last_msg_responded = False async def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/ws_response_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logging.error(f"Error saving RPC log: {e}") async def connect(self): while True: try: current_url = random.choice(SOLANA_ENDPOINTS) self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=10) logger.info(f"Connected to Solana websocket: {current_url}") return except Exception as e: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) async def ws_jsonrpc(self, method, params=None, doProcessResponse = True): if not isinstance(params, list): params = [params] if params is not None else [] request = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } self.last_msg_responded = False await self.websocket.send(json.dumps(request)) if not doProcessResponse: return None else: response = await self.websocket.recv() response_data = json.loads(response) self.last_msg_responded = True await self.save_log(response_data) if 'result' in response_data: return response_data['result'] elif 'error' in response_data: logger.error(f"Error in WebSocket RPC call: {response_data['error']}") return None else: logger.warning(f"Unexpected response: {response_data}") return None async def subscribe(self): params = [ {"mentions": [FOLLOWED_WALLET]}, {"commitment": "confirmed"} ] # define onmessage as inline callback to get subscription_id which waits for last_msg_responded # self.on_message = lambda message: self.subscription_id = message.get('result') await self.ws_jsonrpc("logsSubscribe", params, False) await asyncio.sleep(.4) await self.receive_messages(True) result = await self.process_messages(True) if result is not None and result > 0: self.subscription_id = result logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") elif result: logger.error("already subscribed") else: logger.error("Failed to subscribe") async def unsubscribe(self): if self.subscription_id: result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id]) if result: logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") self.subscription_id = None else: logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") async def receive_messages(self, one = False): while True: try: response = await self.websocket.recv() response_data = json.loads(response) self.last_msg_responded = True if 'result' in response_data: await self.message_queue.put(response_data['result']) if one: break except websockets.exceptions.ConnectionClosedError: logger.error("WebSocket connection closed") break except Exception as e: logger.error(f"Error receiving message: {e}") break async def process_messages(self, one = False): while True: message = await self.message_queue.get() if type(message) == int: subscription_id = message logger.info(f"Subscription id: {subscription_id}") if self.on_message: #message = json.loads(message) await self.on_message(message) logger.info(f"Received message: {message}") if one: return message async def close(self): if self.websocket: await self.websocket.close() logger.info("WebSocket connection closed") async def solana_jsonrpc(self, method, params=None, jsonParsed=True): if not isinstance(params, list): params = [params] if params is not None else [] data = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } if jsonParsed: data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}) else: data["params"].append({"maxSupportedTransactionVersion": 0}) try: response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) response.raise_for_status() result = response.json() if 'result' not in result or 'error' in result: logger.error("Error fetching data from Solana RPC:", result) return None return result['result'] except Exception as e: logger.error(f"Error fetching data from Solana RPC: {e}") return None class SolanaAPI: pk = None ENDPOINT_APIS_URL = { "QUOTE": "https://quote-api.jup.ag/v6/quote?", "SWAP": "https://quote-api.jup.ag/v6/swap", "OPEN_ORDER": "https://jup.ag/api/limit/v1/createOrder", "CANCEL_ORDERS": "https://jup.ag/api/limit/v1/cancelOrders", "QUERY_OPEN_ORDERS": "https://jup.ag/api/limit/v1/openOrders?wallet=", "QUERY_ORDER_HISTORY": "https://jup.ag/api/limit/v1/orderHistory", "QUERY_TRADE_HISTORY": "https://jup.ag/api/limit/v1/tradeHistory" } def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None): self.process_transaction = process_transaction_callback self.on_initial_subscription = on_initial_subscription_callback # if callable(on_initial_subscription_callback) else lambda: None # Define a default lambda function for on_bot_message default_on_bot_message = lambda message: logger.info(f"Bot message: {message}") # Use the provided on_bot_message if it's callable, otherwise use the default self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message self.dex = DEX self.solana_ws = SolanaWS(on_message=self.process_transaction) async def process_messages(self, solana_ws): while True: message = await solana_ws.message_queue.get() await self.process_transaction(message) _first_subscription = True async def wallet_watch_loop(self): solana_ws = SolanaWS(on_message=self.process_transaction) first_subscription = True while True: try: await solana_ws.connect() await solana_ws.subscribe() if first_subscription: first_subscription = False await async_safe_call(self.on_initial_subscription, solana_ws.subscription_id) await async_safe_call(self.on_bot_message,f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) try: await asyncio.gather(receive_task, process_task) except asyncio.CancelledError: pass finally: receive_task.cancel() process_task.cancel() except Exception as e: logger.error(f"An unexpected error occurred: {e}") logger.error("".join(traceback.format_exception(None, e, e.__traceback__))) finally: try: await solana_ws.unsubscribe() if solana_ws.websocket: await solana_ws.close() await async_safe_call(self.on_bot_message,"Reconnecting...") if receive_task and not receive_task.cancelled(): receive_task.cancel() if process_task and not process_task.cancelled(): process_task.cancel() except Exception as e: logger.error(f"An error occurred while unsubscribing: {e}") finally: receive_task = None process_task = None await asyncio.sleep(5) async def get_last_transactions(self, account_address, check_interval=300, limit=1000): last_check_time = None last_signature = None while True: current_time = datetime.now() if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: params = [ account_address, { "limit": limit } ] if last_signature: params[1]["before"] = last_signature result = await self.solana_ws.solana_jsonrpc("getSignaturesForAddress", params) if result: for signature in result: if last_signature and signature['signature'] == last_signature: break await self.process_transaction(signature) if result: last_signature = result[0]['signature'] last_check_time = current_time await asyncio.sleep(1) async def get_token_metadata_symbol(self, mint_address): if mint_address in DEX.TOKENS_INFO and 'symbol' in DEX.TOKENS_INFO[mint_address]: return DEX.TOKENS_INFO[mint_address].get('symbol') try: account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_data = account_data_result['value']['data'] if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: account_data_info = account_data_data['parsed']['info'] if 'decimals' in account_data_info: if mint_address in DEX.TOKENS_INFO: DEX.TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] else: DEX.TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} if 'tokenName' in account_data_info: if mint_address in DEX.TOKENS_INFO: DEX.TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] else: DEX.TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} metadata = await self.get_token_metadata(mint_address) if metadata: if mint_address in DEX.TOKENS_INFO: DEX.TOKENS_INFO[mint_address].update(metadata) else: DEX.TOKENS_INFO[mint_address] = metadata await DEX.save_token_info() # DEX.TOKENS_INFO[mint_address] = metadata # return metadata.get('symbol') or metadata.get('name') return DEX.TOKENS_INFO[mint_address].get('symbol') except Exception as e: logging.error(f"Error fetching token name for {mint_address}: {str(e)}") return None async def get_token_metadata(self, mint_address): try: # Convert mint_address to PublicKey if it's a string if isinstance(mint_address, str): mint_pubkey = Pubkey.from_string(mint_address) else: mint_pubkey = mint_address # Derive metadata account address metadata_program_id = Pubkey.from_string("metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s") metadata_account = Pubkey.find_program_address( [b"metadata", bytes(metadata_program_id), bytes(mint_pubkey)], metadata_program_id )[0] # Fetch metadata account info metadata_account_info = await solana_client.get_account_info(metadata_account) if metadata_account_info.value is not None: data = metadata_account_info.value.data # name = get_token_name_metadata(data).rstrip("\x00") # Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint) offset = 1 + 32 + 32 # Read the name length (u32) name_length = struct.unpack(" 0: parsed_result["token_in"] = mint parsed_result["amount_in"] = amount if parsed_result["token_in"] is None or parsed_result["token_out"] is None: # if we've failed to extract token_in and token_out from the transaction details, try a second method inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) transfers = [] for instruction_set in inner_instructions: for instruction in instruction_set.get('instructions', []): if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: info = instruction['parsed']['info'] amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 adjusted_amount = amount / (10 ** decimals) # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) transfers.append({ 'mint': info.get('mint'), 'amount': adjusted_amount, 'source': info['source'], 'destination': info['destination'] }) # Identify token_in and token_out if len(transfers) >= 2: parsed_result["token_in"] = transfers[0]['mint'] parsed_result["amount_in"] = transfers[0]['amount'] parsed_result["token_out"] = transfers[-1]['mint'] parsed_result["amount_out"] = transfers[-1]['amount'] # If mint is not provided, query the Solana network for the account data if parsed_result["token_in"] is None or parsed_result["token_out"] is None: #for transfer in transfers: # do only first and last transfer for transfer in [transfers[0], transfers[-1]]: if transfer['mint'] is None: # Query the Solana network for the account data account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", transfer['source']) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_value = account_data_result['value'] account_data_data = account_data_value['data'] if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: account_data_info = account_data_data['parsed']['info'] if 'mint' in account_data_info: transfer['mint'] = account_data_info['mint'] if transfer['mint'] in DEX.TOKENS_INFO or 'decimals' not in DEX.TOKENS_INFO[transfer['mint']]: await self.get_token_metadata_symbol(transfer['mint']) # get actual prices current_price = await DEX.get_token_prices([transfer['mint']]) if parsed_result["token_in"] is None: parsed_result["token_in"] = transfer['mint'] parsed_result["symbol_in"] = DEX.TOKENS_INFO[transfer['mint']]['symbol'] parsed_result["amount_in"] = transfer['amount']/10**DEX.TOKENS_INFO[transfer['mint']]['decimals'] parsed_result["amount_in_USD"] = parsed_result["amount_in"] * DEX.TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) elif parsed_result["token_out"] is None: parsed_result["token_out"] = transfer['mint'] parsed_result["symbol_out"] = DEX.TOKENS_INFO[transfer['mint']]['symbol'] parsed_result["amount_out"] = transfer['amount']/10**DEX.TOKENS_INFO[transfer['mint']]['decimals'] parsed_result["amount_out_USD"] = parsed_result["amount_out"] * DEX.TOKENS_INFO[transfer['mint']]['price'] pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) for balance in pre_balalnces: if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] break # Calculate percentage swapped try: if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 else: # calculate based on total wallet value: FOLLOWED_WALLET_VALUE # division by 0 # parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / DEX.FOLLOWED_WALLET_VALUE) * 100 pass except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return parsed_result except requests.exceptions.RequestException as e: print("Error fetching transaction details:", e) # "Program log: Instruction: Swap2", # "Program log: order_id: 13985890735038016", # "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source # "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", async def get_transaction_details_info(self, tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: tr_info = await self.get_transaction_details_with_retry(tx_signature_str) try: tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50 except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return tr_info # def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: # pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) # for balance in pre_balances: # if balance['mint'] == token: # return float(balance['uiTokenAmount']['amount']) # return 0.0 async def get_transaction_details_with_retry(self, transaction_id, retry_delay = 5, max_retries = 16, backoff= True): # wait for the transaction to be confirmed # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) # query every 5 seconds for the transaction details until not None or 30 seconds for _ in range(max_retries): try: tx_details = await self.get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: logging.error(f"Error fetching transaction details: {e}") logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id} [ retry in {retry_delay} s.]") await asyncio.sleep(retry_delay) if backoff: retry_delay = retry_delay * 1.2 return tx_details async def get_swap_transaction_details(self, tx_signature_str): t = await self.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) try: parsed_result = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } instructions = t.value.transaction.transaction.message.instructions # Parse the swap instruction to extract token addresses, amounts, and types for instruction in instructions: if isinstance(instruction, CompiledInstruction): if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"): parsed_info = instruction.parsed.info mint = parsed_info["mint"] amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"]) # Determine token in and token out based on balances if parsed_result["token_in"] is None and amount > 0: parsed_result["token_in"] = mint parsed_result["amount_in"] = amount elif parsed_result["token_out"] is None: parsed_result["token_out"] = mint parsed_result["amount_out"] = amount # Calculate percentage swapped if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 return parsed_result except Exception as e: logging.error(f"Error fetching transaction details: {e}") return None async def get_token_balance_rpc(self, wallet_address, token_address): try: accounts = await self.solana_ws.solana_jsonrpc("getTokenAccountsByOwner", [ wallet_address, { "mint": token_address }]) if accounts['value']: first_account = accounts['value'][0]['pubkey'] balance_data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountBalance", "params": [ first_account ] } balance = self.solana_ws.solana_jsonrpc("getTokenAccountBalance", first_account) if 'value' in balance: amount = float(balance['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No balance found for {token_address} in {wallet_address}") return 0 else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except requests.exceptions.RequestException as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 async def swap_on_jupiter( self, input_mint: str, output_mint: str, amount: int, slippage_bps: int = 1, swap_mode: str = "ExactIn", priority_fee: int = 0, only_direct_routes: bool = False, as_legacy_transaction: bool = False, exclude_dexes: list = None, max_accounts: int = None ) -> str: """Perform a swap on Jupiter with an option to set priority. Args: input_mint (str): Input token mint. output_mint (str): Output token mint. amount (int): Amount to swap, considering token decimals. slippage_bps (int): Slippage in basis points. swap_mode (str): Swap mode, either 'ExactIn' or 'ExactOut'. priority_level (int): Priority level for the transaction fee. only_direct_routes (bool): Limit to direct routes only. as_legacy_transaction (bool): Use legacy transaction format. exclude_dexes (list): List of DEXes to exclude. max_accounts (int): Max number of accounts involved. Returns: str: Serialized transaction data for the swap. """ # Get a quote from Jupiter quote_url = self.ENDPOINT_APIS_URL['QUOTE'] + "inputMint=" + input_mint + "&outputMint=" + output_mint + "&amount=" + str(amount) + "&swapMode=" + swap_mode + "&onlyDirectRoutes=" + str(only_direct_routes).lower() + "&asLegacyTransaction=" + str(as_legacy_transaction).lower() if slippage_bps: quote_url += "&slippageBps=" + str(slippage_bps) if exclude_dexes: quote_url += "&excludeDexes=" + ','.join(exclude_dexes).lower() if max_accounts: quote_url += "&maxAccounts=" + str(max_accounts) quote_response = httpx.get(url=quote_url).json() try: quote_response['routePlan'] except: raise Exception(quote_response['error']) # Prepare transaction parameters fees = "auto" if priority_fee == 0 else priority_fee pair = Keypair.from_bytes(base58.b58decode(self.pk)) pubK = pair.pubkey().__str__() transaction_parameters = { "quoteResponse":quote_response, "userPublicKey": pubK, "wrapAndUnwrapSol": True, "computeUnitPriceMicroLamports":fees } # This will raise an error if data isn't JSON serializable json_string = json.dumps(transaction_parameters) validated_data = json.loads(json_string) response = httpx.post(url=self.ENDPOINT_APIS_URL['SWAP'], json=validated_data) response_data = response.json() # headers = { # 'Content-Type': 'application/json', # 'Accept': 'application/json' # } # response = requests.request("POST", self.ENDPOINT_APIS_URL['SWAP'], headers=headers, data=validated_data) # response_data = response.json() # result = response_data['swapTransaction'] # # # Send the swap request to Jupiter # async with httpx.AsyncClient() as client: # response = await client.post( # self.ENDPOINT_APIS_URL['SWAP'], # json=validated_data # ) # response_data = response.json() result = response_data['swapTransaction'] try: response_data['swapTransaction'] return response_data['swapTransaction'] except: raise Exception(response_data['error']) # Return the serialized transaction return result async def follow_move(self,move): try: your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) if your_balance_info is not None: # Use the balance print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") else: print(f"No ballance found for {move['symbol_in']}. Skipping move.") await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") return your_balance = your_balance_info['amount'] token_info = DEX.TOKENS_INFO.get(move['token_in']) token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata_symbol(move['token_in']) token_name_out = DEX.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out']) if not your_balance: msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." logging.warning(msg) await telegram_utils.send_telegram_message(msg) return if FOLLOW_AMOUNT == 'percentage': # Calculate the amount to swap based on the same percentage as the followed move if move.get('percentage_swapped') is None: followed_ballances = await DEX.get_wallet_balances(FOLLOWED_WALLET, doGetTokenName=False) followed_ballance = next((balance for balance in followed_ballances.values() if balance['address'] == move['token_in']), None) if followed_ballance is not None: # Use the balance print(f"Followed balance: {followed_ballance['amount']} {move['symbol_in']}") move['percentage_swapped'] = (move['amount_in']/followed_ballance['amount']) * 100 amount_to_swap = your_balance * (move['percentage_swapped'] / 100) else: #fallback to 100 USD amount_to_swap = 100 else: amount_to_swap = your_balance * (move['percentage_swapped'] / 100) elif FOLLOW_AMOUNT == 'exact': amount_to_swap = move['amount_in'] else: try: fixed_amount = float(FOLLOW_AMOUNT) # un USD fixed_amount_in_token = fixed_amount / move["token_in_price"] amount_to_swap = min(fixed_amount_in_token, your_balance) except ValueError: msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number." logging.warning(msg) await telegram_utils.send_telegram_message(msg) return amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have decimals = token_info.get('decimals') # Convert to lamports # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9 amount_lamports = int(amount_to_swap * 10**decimals) logging.debug(f"Calculated amount in lamports: {amount_lamports}") if your_balance < amount_to_swap: # should not happen msg = ( f"Warning:\n" f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount_lamports}).\n This will probably fail. But we will try anyway." ) logging.warning(msg) await telegram_utils.send_telegram_message(msg) try: try: notification = ( f"Initiating move:\n" f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}" + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "") ) # logging.info(notification) # error_logger.info(notification) # await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") if self.pk is None: self.pk = await get_pk() for retry in range(2): try: private_key = Keypair.from_bytes(base58.b58decode(self.pk)) async_client = AsyncClient(SOLANA_WS_URL) jupiter = Jupiter(async_client, private_key) # https://station.jup.ag/api-v6/post-swap #transaction_data = await jupiter.swap( transaction_data = await self.swap_on_jupiter( input_mint=move['token_in'], output_mint=move['token_out'], amount=amount_lamports, slippage_bps=300, # Increased to 3% #priority_fee= 100_000 commented for auto ) logging.info(f"Initiating move. Transaction data:\n {transaction_data}") raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) # working - no priority fee signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) opts = TxOpts( skip_preflight=False, preflight_commitment=Processed, ) # send the transaction result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) transaction_id = json.loads(result.to_json())['result'] notification = f"Follow {move.get('type', 'SWAP').upper()} Success:\nTransaction: swapping {amount_to_swap:.2f} {token_name_in}" logging.info(notification) await telegram_utils.send_telegram_message(notification) tx_details = await SAPI.get_transaction_details_with_retry(transaction_id, retry_delay=5, max_retries=10, backoff=False) if tx_details is not None: break else: logging.warning(f"Failed to get transaction details for {transaction_id}.\n Probably transaction failed. Retrying again...") await asyncio.sleep(3) except Exception as e: decoded_data = ''# base64.b64decode(transaction_data) error_message = f"Move Failed:\n{str(e)}\n{decoded_data}\n{move}" logging.error(error_message) # log the errors to /logs/errors.log # error_logger.error(error_message) # error_logger.exception(e) await telegram_utils.send_telegram_message(error_message) amount_to_swap = int(amount_to_swap * 0.9) await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) try: if tx_details is None: logging.info(f"Failed to get transaction details for {transaction_id}") notification = ( f"Move Followed, failed to get transaction details.\n" f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" f"\n\nTransaction: solscan.io" # log_successful_swap () ) else: notification = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" f"for {tx_details['amount_out']:.2f} {token_name_out}" # f"Amount In USD: {tr_details['amount_in_USD']}\n" f"\n\nTransaction: solscan.io" ) logging.info(notification) await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") except Exception as e: error_message = f"Swap Follow Error:\n{str(e)}" logging.error(error_message) # log the errors to /logs/errors.log # error_logger.error(error_message) # error_logger.exception(e) # if error_message contains 'Program log: Error: insufficient funds' if 'insufficient funds' in error_message: await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") else: await telegram_utils.send_telegram_message(error_message) except Exception as e: logging.error(f"Error following move: {e}") async def calculate_priority_fee(self, async_client, priority_level=5): try: recent_fees = await async_client.get_recent_prioritization_fees() if not recent_fees or len(recent_fees) == 0: return 100_000 # fallback value in microlamports # Calculate average and max fees fees = [slot_fee.prioritization_fee for slot_fee in recent_fees] avg_fee = sum(fees) / len(fees) max_fee = max(fees) # Calculate base fee (weighted average between mean and max) base_fee = (2 * avg_fee + max_fee) / 3 # Calculate scaling factor (priority_level / 5) # priority 5 = 1x base_fee # priority 10 = 2x base_fee # priority 1 = 0.2x base_fee scaling_factor = priority_level / 5 final_fee = int(base_fee * scaling_factor) # Set minimum fee to avoid too low values return max(final_fee, 100_001) # minimum 100,000 microlamports except Exception as e: logging.warning(f"Error calculating priority fee: {str(e)}") return 100_000 # fallback value in microlamports class SolanaDEX: def __init__(self, DISPLAY_CURRENCY: str): self.DISPLAY_CURRENCY = DISPLAY_CURRENCY self.solana_client = AsyncClient("https://api.mainnet-beta.solana.com") self.TOKENS_INFO = {} self.TOKEN_PRICES = {} self.TOKEN_ADDRESSES = {} self.FOLLOWED_WALLET_VALUE = 0 self.YOUR_WALLET_VALUE = 0 self.token_info_path = os.path.normpath(os.path.join(script_dir, '..', 'cache', 'token_info.json')) try: with open(self.token_info_path, 'r') as f: self.TOKENS_INFO = json.load(f) except Exception as e: logging.error(f"Error loading token info: {str(e)}") async def get_token_prices(self, token_addresses: List[str]) -> Dict[str, float]: prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} remaining_tokens = [addr for addr in token_addresses if addr not in prices] coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens) prices.update(coingecko_prices) missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: jupiter_prices = await self.get_prices_from_jupiter(list(missing_tokens)) prices.update(jupiter_prices) missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) if missing_tokens: dexscreener_prices = await self.get_prices_from_dexscreener(list(missing_tokens)) prices.update(dexscreener_prices) missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: raydium_prices = await self.get_prices_from_raydium(list(missing_tokens)) prices.update(raydium_prices) missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: orca_prices = await self.get_prices_from_orca(list(missing_tokens)) prices.update(orca_prices) for token in set(token_addresses) - set(prices.keys()): prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") for token, price in prices.items(): token_info = self.TOKENS_INFO.setdefault(token, {}) if 'symbol' not in token_info: token_info['symbol'] = await SAPI.get_token_metadata_symbol(token) token_info['price'] = price return prices async def get_prices_from_coingecko(self, token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" prices = {} async def fetch_single_price(session, address): params = { "contract_addresses": address, "vs_currencies": self.DISPLAY_CURRENCY.lower() } try: async with session.get(base_url, params=params) as response: if response.status == 200: data = await response.json() if address in data and self.DISPLAY_CURRENCY.lower() in data[address]: return address, data[address][self.DISPLAY_CURRENCY.lower()] else: logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") except Exception as e: logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") return address, None async with aiohttp.ClientSession() as session: tasks = [fetch_single_price(session, address) for address in token_addresses] results = await asyncio.gather(*tasks) for address, price in results: if price is not None: prices[address] = price return prices async def get_prices_from_dexscreener(self, token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} try: async with aiohttp.ClientSession() as session: tasks = [self.fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] results = await asyncio.gather(*tasks) for address, result in zip(token_addresses, results): if result and 'pairs' in result and result['pairs']: pair = result['pairs'][0] # Use the first pair (usually the most liquid) prices[address] = float(pair['priceUsd']) else: logging.warning(f"No price data found on DexScreener for token {address}") except Exception as e: logging.error(f"Error fetching token prices from DexScreener: {str(e)}") return prices async def get_prices_from_jupiter(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://price.jup.ag/v4/price" params = { "ids": ",".join(token_addresses) } prices = {} try: async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() for address, price_info in data.get('data', {}).items(): if 'price' in price_info: prices[address] = float(price_info['price']) else: logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") except Exception as e: logging.error(f"Error fetching token prices from Jupiter: {str(e)}") return prices async def get_prices_from_raydium(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://api.raydium.io/v2/main/price" prices = {} try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() for address in token_addresses: if address in data: prices[address] = float(data[address]) else: logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") except Exception as e: logging.error(f"Error fetching token prices from Raydium: {str(e)}") return prices async def get_prices_from_orca(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://api.orca.so/allTokens" prices = {} try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() for token_info in data: if token_info['mint'] in token_addresses: prices[token_info['mint']] = float(token_info['price']) else: logging.error(f"Failed to get token prices from Orca. Status: {response.status}") except Exception as e: logging.error(f"Error fetching token prices from Orca: {str(e)}") return prices @staticmethod async def fetch_token_data(session, url): try: async with session.get(url) as response: if response.status == 200: return await response.json() else: logging.error(f"Failed to fetch data from {url}. Status: {response.status}") return None except Exception as e: logging.error(f"Error fetching data from {url}: {str(e)}") return None async def get_sol_price(self) -> float: sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address prices = await self.get_token_prices([sol_address]) return prices.get(sol_address, 0.0) async def get_wallet_balances(self, wallet_address, doGetTokenName=True): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") try: response = await self.solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") ), commitment=Confirmed ) if response.value: for account in response.value: try: parsed_data = account.account.data.parsed if isinstance(parsed_data, dict) and 'info' in parsed_data: info = parsed_data['info'] if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: mint = info['mint'] decimals = int(info['tokenAmount']['decimals']) amount = int(info['tokenAmount']['amount']) amount = float(amount /10**decimals) if amount > 1: if mint in self.TOKENS_INFO: token_name = self.TOKENS_INFO[mint].get('symbol') elif doGetTokenName: token_name = await self.get_token_metadata_symbol(mint) or 'N/A' self.TOKENS_INFO[mint] = {'symbol': token_name} await asyncio.sleep(2) self.TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) self.TOKENS_INFO[mint]['decimals'] = decimals balances[mint] = { 'name': token_name or 'N/A', 'address': mint, 'amount': amount, 'decimals': decimals } try: logging.debug(f"Account balance for {token_name} ({mint}): {amount}") except Exception as e: logging.error(f"Error logging account balance: {str(e)}") else: logging.warning(f"Unexpected data format for account: {account}") except Exception as e: logging.error(f"Error parsing account data: {str(e)}") sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: balances['SOL'] = { 'name': 'SOL', 'address': 'SOL', 'amount': sol_balance.value / 1e9 } else: logging.warning(f"SOL balance response missing for wallet: {wallet_address}") except Exception as e: logging.error(f"Error getting wallet balances: {str(e)}") logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") return balances async def convert_balances_to_currency(self, balances, sol_price): converted_balances = {} for address, info in balances.items(): converted_balance = info.copy() if info['name'] == 'SOL': converted_balance['value'] = info['amount'] * sol_price elif address in self.TOKEN_PRICES: converted_balance['value'] = info['amount'] * self.TOKEN_PRICES[address] else: converted_balance['value'] = None logging.warning(f"Price not available for token {info['name']} ({address})") converted_balances[address] = converted_balance return converted_balances async def list_initial_wallet_states(self, FOLLOWED_WALLET, YOUR_WALLET): followed_wallet_balances = await self.get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances = await self.get_wallet_balances(YOUR_WALLET) all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()) | set(self.TOKEN_ADDRESSES.values())) self.TOKEN_PRICES = await self.get_token_prices(all_token_addresses) sol_price = await self.get_sol_price() followed_converted_balances = await self.convert_balances_to_currency(followed_wallet_balances, sol_price) your_converted_balances = await self.convert_balances_to_currency(your_wallet_balances, sol_price) self.TOKEN_ADDRESSES = { address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 } logging.info(f"Monitoring balances for tokens: {[info['name'] for info in self.TOKEN_ADDRESSES.values()]}") followed_wallet_state = [] self.FOLLOWED_WALLET_VALUE = 0 for address, info in followed_converted_balances.items(): if info['value'] is not None and info['value'] > 0: followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY} ({info['address']})") self.FOLLOWED_WALLET_VALUE += info['value'] your_wallet_state = [] self.YOUR_WALLET_VALUE = 0 for address, info in your_converted_balances.items(): if info['value'] is not None and info['value'] > 0: your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY}") self.YOUR_WALLET_VALUE += info['value'] message = ( f"Initial Wallet States (All balances in {self.DISPLAY_CURRENCY}):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{chr(10).join(followed_wallet_state)}\n" f"Total Value: {self.FOLLOWED_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{chr(10).join(your_wallet_state)}\n" f"Total Value: {self.YOUR_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n" f"Monitored Tokens:\n" f"{', '.join([self.safe_get_property(info, 'name') for info in self.TOKEN_ADDRESSES.values()])}" ) logging.info(message) # await telegram_utils.send_telegram_message(message) # save token info to file await self.save_token_info() async def save_token_info(self): with open(self.token_info_path, 'w') as f: json.dump(self.TOKENS_INFO, f, indent=2) @staticmethod def safe_get_property(obj, prop): return obj.get(prop, 'N/A') DEX = SolanaDEX(DISPLAY_CURRENCY) SAPI = SolanaAPI( on_initial_subscription_callback=DEX.list_initial_wallet_states(FOLLOWED_WALLET,YOUR_WALLET))