import asyncio import websockets import json from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from spl.token.client import Token from base64 import b64decode import base58 from solders.rpc.requests import GetTransaction from solders.signature import Signature from solders.pubkey import Pubkey from solders.keypair import Keypair from solders.transaction import VersionedTransaction from solders.transaction import Transaction from solders.message import Message from solders.instruction import Instruction from solders.hash import Hash from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient from telegram import Bot from telegram.constants import ParseMode import datetime import logging from logging.handlers import RotatingFileHandler import base64 import os from dotenv import load_dotenv,set_key import aiohttp from typing import List, Dict import requests import threading import re from typing import List, Dict, Any, Tuple load_dotenv() load_dotenv('.env.secret') # ToDo - make it work logger = logging.getLogger(__name__) # logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.INFO) # Set up error logger log_dir = './logs' log_file = os.path.join(log_dir, 'error.log') os.makedirs(log_dir, exist_ok=True) error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) error_file_handler.setLevel(logging.ERROR) error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) error_logger = logging.getLogger('error_logger') error_logger.setLevel(logging.ERROR) error_logger.addHandler(error_file_handler) app = Flask(__name__) ENV_FILE = '.env' async def save_subscription_id(subscription_id): # storing subscription id in .env file disabled #set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id)) logger.info(f"Saved subscription ID: {subscription_id}") async def load_subscription_id(): subscription_id = os.getenv("SUBSCRIPTION_ID") return int(subscription_id) if subscription_id else None # Function to find the latest log file def get_latest_log_file(): log_dir = './logs' try: # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] # filter files mask log_20241005_004103_143116.json files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x))) return os.path.join(log_dir, latest_file) except Exception as e: logging.error(f"Error fetching latest log file: {e}") return None # Flask route to retry processing the last log @app.route('/retry-last-log', methods=['GET']) def retry_last_log(): latest_log_file = get_latest_log_file() if not latest_log_file: return jsonify({"error": "No log files found"}), 404 try: with open(latest_log_file, 'r') as f: log = json.load(f) # await process_log(log) # Run the asynchronous process_log function asyncio.run(process_log(log)) return jsonify({"status": "Log dump processed successfully"}), 200 except Exception as e: logging.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 # Configuration DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") YOUR_WALLET = os.getenv("YOUR_WALLET") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') # Use the production Solana RPC endpoint solana_client = AsyncClient(SOLANA_HTTP_URL) dexscreener_client = DexscreenerClient() # Initialize Telegram Bot bot = Bot(token=TELEGRAM_BOT_TOKEN) # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", } TOKENS_INFO = {} try: with open('./logs/token_info.json', 'r') as f: TOKENS_INFO = json.load(f) except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # async def send_telegram_message(message): try: await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") # logging.info(f"Telegram message dummy sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") # # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: global TOKENS_INFO prices = await get_prices_from_coingecko(token_addresses) # For tokens not found in CoinGecko, use DexScreener missing_tokens = set(token_addresses) - set(prices.keys()) if missing_tokens: dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) prices.update(dexscreener_prices) # If any tokens are still missing, set their prices to 0 for token in set(token_addresses) - set(prices.keys()): prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") # update token info with prices # for token, price in prices.items(): # if token in TOKENS_INFO: # TOKENS_INFO[token]['price'] = price # else: # TOKENS_INFO[token] = {'price': price} for token, price in prices.items(): if not token in TOKENS_INFO or not TOKENS_INFO[token].get('symbol'): token_name = await get_token_metadata_symbol(token) TOKENS_INFO[token] = {'symbol': token_name} TOKENS_INFO[token] = {'price': price} return prices async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: url = "https://api.coingecko.com/api/v3/simple/token_price/solana" params = { "contract_addresses": ",".join(token_addresses), "vs_currencies": DISPLAY_CURRENCY.lower() } prices = {} async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() for address, price_info in data.items(): if DISPLAY_CURRENCY.lower() in price_info: prices[address] = price_info[DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}") return prices async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} async with aiohttp.ClientSession() as session: tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] results = await asyncio.gather(*tasks) for address, result in zip(token_addresses, results): if result and 'pairs' in result and result['pairs']: pair = result['pairs'][0] # Use the first pair (usually the most liquid) prices[address] = float(pair['priceUsd']) else: logging.warning(f"No price data found on DexScreener for token {address}") return prices async def fetch_token_data(session, url): try: async with session.get(url) as response: if response.status == 200: return await response.json() else: logging.error(f"Failed to fetch data from {url}. Status: {response.status}") return None except Exception as e: logging.error(f"Error fetching data from {url}: {str(e)}") return None async def get_sol_price() -> float: url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() return data['solana'][DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}") return await get_sol_price_from_dexscreener() async def get_sol_price_from_dexscreener() -> float: sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address prices = await get_prices_from_dexscreener([sol_address]) return prices.get(sol_address, 0.0) # # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # async def get_token_balance_rpc(wallet_address, token_address): url = SOLANA_HTTP_URL headers = {"Content-Type": "application/json"} data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountsByOwner", "params": [ wallet_address, { "mint": token_address }, { "encoding": "jsonParsed" } ] } try: response = requests.post(url, headers=headers, data=json.dumps(data)) response.raise_for_status() # Raises an error for bad responses accounts = response.json() if 'result' in accounts and accounts['result']['value']: first_account = accounts['result']['value'][0]['pubkey'] balance_data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountBalance", "params": [ first_account ] } balance_response = requests.post(url, headers=headers, data=json.dumps(balance_data)) balance_response.raise_for_status() balance = balance_response.json() if 'result' in balance and 'value' in balance['result']: amount = float(balance['result']['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No balance found for {token_address} in {wallet_address}") return 0 else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except requests.exceptions.RequestException as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 # # # solders/solana libs (solana_client) # # # from spl.token._layouts import MINT_LAYOUT from solana.rpc.api import Client, Pubkey from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from borsh_construct import String, CStruct async def get_token_metadata_symbol(mint_address): global TOKENS_INFO if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: return TOKENS_INFO[mint_address].get('symbol') try: account_data_result = await solana_jsonrpc("getAccountInfo", mint_address) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_data = account_data_result['value']['data'] if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: account_data_info = account_data_data['parsed']['info'] if 'decimals' in account_data_info: if mint_address in TOKENS_INFO: TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] else: TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} if 'tokenName' in account_data_info: if mint_address in TOKENS_INFO: TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] else: TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} metadata = await get_token_metadata(mint_address) if metadata: if mint_address in TOKENS_INFO: TOKENS_INFO[mint_address].update(metadata) else: TOKENS_INFO[mint_address] = metadata await save_token_info() # TOKENS_INFO[mint_address] = metadata # return metadata.get('symbol') or metadata.get('name') return TOKENS_INFO[mint_address].get('symbol') except Exception as e: logging.error(f"Error fetching token name for {mint_address}: {str(e)}") return None METADATA_STRUCT = CStruct( "update_authority" / String, "mint" / String, "name" / String, "symbol" / String, "uri" / String, # ... other fields ... ) import struct def get_token_name_metadata(metadata_account_data): try: # Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint) offset = 1 + 32 + 32 # Read the name length (u32) name_length = struct.unpack(" update_authority # stream read less than specified amount, expected 2189641476, found 675 # # Parse metadata # key = data[:4] # update_authority = Pubkey(data[4:36]) # mint = Pubkey(data[36:68]) # name_length = struct.unpack(' 0 and data[0] == 4: # name_len = int.from_bytes(data[1:5], byteorder="little") # name = data[5:5+name_len].decode("utf-8").strip("\x00") # symbol_len = int.from_bytes(data[5+name_len:9+name_len], byteorder="little") # symbol = data[9+name_len:9+name_len+symbol_len].decode("utf-8").strip("\x00") # return {"name": name, "symbol": symbol} except Exception as e: logging.error(f"Error fetching token metadata for {mint_address}: {str(e)}") return None async def get_wallet_balances(wallet_address, doGetTokenName=True): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") global TOKENS_INFO try: response = await solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") ), commitment=Confirmed ) if response.value: for account in response.value: try: parsed_data = account.account.data.parsed if isinstance(parsed_data, dict) and 'info' in parsed_data: info = parsed_data['info'] if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: mint = info['mint'] decimals = info['tokenAmount']['decimals'] amount = float(info['tokenAmount']['amount'])/10**decimals if amount > 0: if mint in TOKENS_INFO: token_name = TOKENS_INFO[mint].get('symbol') elif doGetTokenName: token_name = await get_token_metadata_symbol(mint) or 'N/A' # sleep for 1 second to avoid rate limiting await asyncio.sleep(2) TOKENS_INFO[mint]['holdedAmount'] = amount TOKENS_INFO[mint]['decimals'] = decimals balances[mint] = { 'name': token_name or 'N/A', 'address': mint, 'amount': amount, 'decimals': decimals } # sleep for 1 second to avoid rate limiting logging.debug(f"Balance for {token_name} ({mint}): {amount}") else: logging.warning(f"Unexpected data format for account: {account}") except Exception as e: logging.error(f"Error parsing account data: {str(e)}") sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: balances['SOL'] = { 'name': 'SOL', 'address': 'SOL', 'amount': sol_balance.value / 1e9 } else: logging.warning(f"SOL balance response missing for wallet: {wallet_address}") except Exception as e: logging.error(f"Error getting wallet balances: {str(e)}") logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") return balances async def convert_balances_to_currency(balances , sol_price): converted_balances = {} for address, info in balances.items(): converted_balance = info.copy() # Create a copy of the original info if info['name'] == 'SOL': converted_balance['value'] = info['amount'] * sol_price elif address in TOKEN_PRICES: converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] else: converted_balance['value'] = None # Price not available logging.warning(f"Price not available for token {info['name']} ({address})") converted_balances[address] = converted_balance return converted_balances async def get_swap_transaction_details(tx_signature_str): t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) try: parsed_result = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } instructions = t.value.transaction.transaction.message.instructions # Parse the swap instruction to extract token addresses, amounts, and types for instruction in instructions: if isinstance(instruction, CompiledInstruction): if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"): parsed_info = instruction.parsed.info mint = parsed_info["mint"] amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"]) # Determine token in and token out based on balances if parsed_result["token_in"] is None and amount > 0: parsed_result["token_in"] = mint parsed_result["amount_in"] = amount elif parsed_result["token_out"] is None: parsed_result["token_out"] = mint parsed_result["amount_out"] = amount # Calculate percentage swapped if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 return parsed_result except Exception as e: logging.error(f"Error fetching transaction details: {e}") return None # # # RAW Solana API RPC # # # #this is the meat of the application async def get_transaction_details_rpc(tx_signature, readfromDump=False): try: if readfromDump and os.path.exists('./logs/transation_details.json'): with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details transaction_details = json.load(f) return transaction_details else: transaction_details = await solana_jsonrpc("getTransaction", tx_signature) with open('./logs/transation_details.json', 'w') as f: json.dump(transaction_details, f, indent=2) if transaction_details is None: logging.error(f"Error fetching transaction details for {tx_signature}") return None # Initialize default result structure parsed_result = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } # Extract order_id from logs log_messages = transaction_details.get("meta", {}).get("logMessages", []) for log in log_messages: if "order_id" in log: parsed_result["order_id"] = log.split(":")[2].strip() break # Extract token transfers from innerInstructions inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) for instruction_set in inner_instructions: for instruction in instruction_set.get('instructions', []): if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': info = instruction['parsed']['info'] mint = info['mint'] amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals # Determine which token is being swapped in and out based on zero balances if parsed_result["token_in"] is None and amount > 0: parsed_result["token_in"] = mint parsed_result["amount_in"] = amount if parsed_result["token_in"] is None or parsed_result["token_out"] is None: # if we've failed to extract token_in and token_out from the transaction details, try a second method inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) transfers = [] for instruction_set in inner_instructions: for instruction in instruction_set.get('instructions', []): if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: info = instruction['parsed']['info'] amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 adjusted_amount = amount / (10 ** decimals) # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) transfers.append({ 'mint': info.get('mint'), 'amount': adjusted_amount, 'source': info['source'], 'destination': info['destination'] }) # Identify token_in and token_out if len(transfers) >= 2: parsed_result["token_in"] = transfers[0]['mint'] parsed_result["amount_in"] = transfers[0]['amount'] parsed_result["token_out"] = transfers[-1]['mint'] parsed_result["amount_out"] = transfers[-1]['amount'] # If mint is not provided, query the Solana network for the account data if parsed_result["token_in"] is None or parsed_result["token_out"] is None: #for transfer in transfers: # do only first and last transfer for transfer in [transfers[0], transfers[-1]]: if transfer['mint'] is None: # Query the Solana network for the account data account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_value = account_data_result['value'] account_data_data = account_data_value['data'] if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: account_data_info = account_data_data['parsed']['info'] if 'mint' in account_data_info: transfer['mint'] = account_data_info['mint'] if parsed_result["token_in"] is None: parsed_result["token_in"] = transfer['mint'] parsed_result["amount_in"] = transfer['amount']/10**6 elif parsed_result["token_out"] is None: parsed_result["token_out"] = transfer['mint'] parsed_result["amount_out"] = transfer['amount']/10**6 pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) for balance in pre_balalnces: if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] break # Calculate percentage swapped try: if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return parsed_result except requests.exceptions.RequestException as e: print("Error fetching transaction details:", e) async def solana_jsonrpc(method, params = None, jsonParsed = True): # target json example: # data = { # "jsonrpc": "2.0", # "id": 1, # "method": "getTransaction", # "params": [ # tx_signature, # { # "encoding": "jsonParsed", # "maxSupportedTransactionVersion": 0 # } # ] # } # if param is not array, make it array if not isinstance(params, list): params = [params] data = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params or [] } data["params"].append({"maxSupportedTransactionVersion": 0}) if jsonParsed: data["params"][1]["encoding"] = "jsonParsed" try: # url = 'https://solana.drpc.org' response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) response.raise_for_status() # Raises an error for bad responses result = response.json() if not 'result' in result or 'error' in result: print("Error fetching data from Solana RPC:", result) return None return result['result'] except Exception as e: logging.error(f"Error fetching data from Solana RPC: {e}") return None # # # # # # # # # # Functionality # # # # # # # # # # async def list_initial_wallet_states(): global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES global TOKENS_INFO # new followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances = await get_wallet_balances(YOUR_WALLET) all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()) | set(TOKEN_ADDRESSES.values())) TOKEN_PRICES = await get_token_prices(all_token_addresses) sol_price = await get_sol_price() followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) TOKEN_ADDRESSES = { address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 } logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") followed_wallet_state = [] FOLLOWED_WALLET_VALUE = 0 for address, info in followed_converted_balances.items(): if info['value'] is not None and info['value'] > 0: followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") FOLLOWED_WALLET_VALUE += info['value'] your_wallet_state = [] YOUR_WALLET_VALUE = 0 for address, info in your_converted_balances.items(): if info['value'] is not None and info['value'] > 0: your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") YOUR_WALLET_VALUE += info['value'] message = ( f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{chr(10).join(followed_wallet_state)}\n" f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{chr(10).join(your_wallet_state)}\n" f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"Monitored Tokens:\n" f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" ) logging.info(message) await send_telegram_message(message) # save token info to file await save_token_info() def safe_get_property(info, property_name, default='Unknown'): if not isinstance(info, dict): return str(default) value = info.get(property_name, default) return str(value) if value is not None else str(default) async def save_token_info(): with open('./logs/token_info.json', 'w') as f: json.dump(TOKENS_INFO, f, indent=2) async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, max_retries = 12): # wait for the transaction to be confirmed # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) # qwery every 5 seconds for the transaction details untill not None or 30 seconds for _ in range(max_retries): try: tx_details = await get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: logging.error(f"Error fetching transaction details: {e}") logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}") await asyncio.sleep(retry_delay) return tx_details async def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logging.error(f"Error saving RPC log: {e}") async def process_log(log_result): if log_result['value']['err']: return logs = log_result['value']['logs'] try: # Detect swap operations in logs swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn'] if any(op in logs for op in swap_operations): # Save the log to a file await save_log(log_result) tx_signature_str = log_result['value']['signature'] before_source_balance = 0 source_token_change = 0 tr_details = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } i = 0 while i < len(logs): log_entry = logs[i] # Check if we found the 'order_id' if tr_details["order_id"] is None and "order_id" in log_entry: # Extract the order_id tr_details["order_id"] = log_entry.split(":")[-1].strip() tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() # Look for the token change amounts after tokens have been found if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals elif "destination_token_change" in part: tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals i += 1 # calculatte percentage swapped by digging before_source_balance, source_token_change and after_source_balance # "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752", # "Program log: after_source_balance: 0, after_destination_balance: 770570049", # "Program log: source_token_change: 19471871, destination_token_change: 770570049", if "before_source_balance" in log_entry: parts = log_entry.split(", ") for part in parts: if "before_source_balance" in part: before_source_balance = float(part.split(":")[-1].strip()) / 10 ** 6 if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6 # GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS try: if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") tr_details = await get_transaction_details_info(tx_signature_str, logs) # onlt needed if no details got if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 #dirty fix for percentage > 100 (decimals 9 but expecting 6) if tr_details["percentage_swapped"] > 100: tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000 # update token info all_token_addresses = list(set([tr_details["token_in"], tr_details["token_out"]])) await get_token_prices(all_token_addresses) try: token_in = TOKENS_INFO[tr_details["token_in"]] token_out = TOKENS_INFO[tr_details["token_out"]] tr_details["symbol_in"] = token_in.get('symbol') tr_details["symbol_out"] = token_out.get('symbol') tr_details['amount_in_USD'] = tr_details['amount_in'] * token_in.get('price', 0) tr_details['amount_out_USD'] = tr_details['amount_out'] * token_out.get('price', 0) except Exception as e: logging.error(f"Error fetching token prices: {e}") message_text = ( f"Swap detected: \n" f"Token In: {tr_details['symbol_in']} ({tr_details['token_in']})\n" f"Token Out: {tr_details['symbol_out']} ({tr_details['token_out']})\n" f"Amount In USD: {tr_details['amount_in_USD']:.2f}\n" f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%" ) await send_telegram_message(message_text) await follow_move(tr_details) await save_token_info() except Exception as e: logging.error(f"Error aquiring log details and following: {e}") return except Exception as e: logging.error(f"Error processing log: {e}") # "Program log: Instruction: Swap2", # "Program log: order_id: 13985890735038016", # "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source # "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: global TOKENS_INFO tr_info = await get_transaction_details_with_retry(tx_signature_str) # Fetch token prices token_prices = await get_token_prices([tr_info['token_in'], tr_info['token_out']]) # for token, price in token_prices.items(): # if not token in TOKENS_INFO or not TOKENS_INFO[token].get('symbol'): # token_name = await get_token_metadata_symbol(token) # TOKENS_INFO[token] = {'symbol': token_name} # TOKENS_INFO[token] = {'price': price} # Calculate USD values tr_info['amount_in_USD'] = tr_info['amount_in'] * token_prices.get(tr_info['token_in'], 0) tr_info['amount_out_USD'] = tr_info['amount_out'] * token_prices.get(tr_info['token_out'], 0) # Calculate the percentage of the source balance that was swapped; ToDo: fix decimals for percentage try: tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50 except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return tr_info def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) for balance in pre_balances: if balance['mint'] == token: return float(balance['uiTokenAmount']['amount']) return 0.0 async def follow_move(move): your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) your_balance = your_balance_info['amount'] token_info = TOKENS_INFO.get(move['token_in']) token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in']) token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await get_token_metadata_symbol(move['token_out']) if not your_balance: msg = f"Move Failed:\nNo balance found for token {move['token_in']}. Cannot follow move." logging.warning(msg) await send_telegram_message(msg) return # move["percentage_swapped"] = (move["amount_out"] / move["amount_in"]) * 100 # Calculate the amount to swap based on the same percentage as the followed move amount_to_swap = your_balance * (move['percentage_swapped'] / 100) # # always get 99% of the amount to swap # amount_to_swap = amount_to_swap * 0.99 if your_balance >= amount_to_swap: msg = ( f"Warning:\n" f"We have {your_balance:.6f} {token_name_in}. Insufficient balance to swap {amount_to_swap:.6f} ({move['token_in']}). This will probably fail. But we will try anyway." ) logging.warning(msg) await send_telegram_message(msg) try: # Convert to lamports # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9 amount = int(amount_to_swap * 10**token_info.get('decimals') ) try: notification = ( f"Initiating move:\n (decimals: {token_info.get('decimals')})\n" f"Swapping {move['percentage_swapped']:.2f}% ({amount_to_swap:.2f}) {token_name_in} for {token_name_out}" ) logging.info(notification) error_logger.info(notification) await send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") for retry in range(2): private_key = Keypair.from_bytes(base58.b58decode(pk)) async_client = AsyncClient(SOLANA_WS_URL) jupiter = Jupiter(async_client, private_key) transaction_data = await jupiter.swap( input_mint=move['token_in'], output_mint=move['token_out'], amount=amount, slippage_bps=100, # Increased to 1% ) error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) # send the transaction result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) transaction_id = json.loads(result.to_json())['result'] print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}") send_telegram_message(f"Follow Transaction Sent: {transaction_id}") tx_details = await get_transaction_details_with_retry(transaction_id) if tx_details is not None: break else: logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...") await asyncio.sleep(5) await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) try: if tx_details is None: logging.info(f"Failed to get transaction details for {transaction_id}") notification = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" f"\n\nTransaction: {transaction_id}" ) else: notification = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" f"for {tx_details['amount_out']:.2f} {token_name_out}" # f"Amount In USD: {tr_details['amount_in_USD']}\n" f"\n\nTransaction: {transaction_id}" ) logging.info(notification) await send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") except Exception as e: error_message = f"Swap Follow Error:\n{str(e)}" logging.error(error_message) # log the errors to /logs/errors.log error_logger.error(error_message) error_logger.exception(e) # await send_telegram_message(error_message) # else: # msg = ( # f"Move Not Followed:\n" # f"Insufficient balance to swap {amount_to_swap:.6f} {token_name_in} ({move['token_in']})" # ) # logging.warning(msg) # await send_telegram_message(msg) # Helper functions (implement these according to your needs) async def subscribe_to_wallet(): SOLANA_ENDPOINTS = [ "wss://api.mainnet-beta.solana.com", "wss://solana-api.projectserum.com", "wss://rpc.ankr.com/solana", "wss://mainnet.rpcpool.com", ] uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com reconnect_delay = 5 # Start with a 5-second delay max_reconnect_delay = 60 # Maximum delay of 60 seconds while True: try: async with websockets.connect(uri) as websocket: logger.info("Connected to Solana websocket") subscription_id = await load_subscription_id() request = { "jsonrpc": "2.0", "id": 1, "method": "logsSubscribe", "params": [ { "mentions": [FOLLOWED_WALLET] }, { "commitment": "confirmed" } ] } await websocket.send(json.dumps(request)) logger.info("Subscription request sent") while True: try: response = await websocket.recv() response_data = json.loads(response) logger.debug(f"Received response: {response_data}") if 'result' in response_data: subscription_id = response_data['result'] await save_subscription_id(subscription_id) logger.info(f"Subscription successful. Subscription id: {subscription_id}") await send_telegram_message("Connected to Solana network. Watching for transactions now.") elif 'params' in response_data: log = response_data['params']['result'] logging.debug(f"Received transaction log: {log}") # Create a new task for processing the log asyncio.create_task(process_log(log)) else: logger.warning(f"Unexpected response: {response}") except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") break except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") break except websockets.exceptions.WebSocketException as e: logger.error(f"WebSocket error: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") await asyncio.sleep(reconnect_delay) # Implement exponential backoff reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) pk = os.getenv("PK") if not pk: try: script_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(script_dir, 'secret.pk'), 'r') as f: pk = f.read().strip() if pk: logging.info("Private key loaded successfully from file.") else: logging.warning("Private key file is empty.") except FileNotFoundError: logging.warning("Private key file not found.") except Exception as e: logging.error(f"Error reading private key file: {str(e)}") if not pk: logging.error("Private key not found in environment variables. Will not be able to sign transactions.") # send TG warning message send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") async def main(): await send_telegram_message("Solana Agent Started. Connecting to mainnet...") asyncio.create_task( list_initial_wallet_states()) await subscribe_to_wallet() def run_flask(): # Run Flask app without the reloader, so we can run the async main function app.run(debug=False, port=3001, use_reloader=False) if __name__ == '__main__': # Start Flask in a separate thread flask_thread = threading.Thread(target=run_flask) flask_thread.start() # Create an event loop for the async tasks loop = asyncio.get_event_loop() loop.run_until_complete(main()) # Start Flask in a separate thread flask_thread = threading.Thread(target=run_flask) flask_thread.start() # Run the async main function asyncio.run(main())