import asyncio import uvicorn from asgiref.wsgi import WsgiToAsgi import websockets import json from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from spl.token.client import Token from base64 import b64decode import base58 from solders.rpc.requests import GetTransaction from solders.signature import Signature from solders.pubkey import Pubkey from solders.keypair import Keypair from solders.transaction import VersionedTransaction from solders.transaction import Transaction from solders.message import Message from solders.instruction import Instruction from solders.hash import Hash from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient from solana.rpc.types import TokenAccountOpts, TxOpts import datetime import logging from logging.handlers import RotatingFileHandler import base64 import os from dotenv import load_dotenv,set_key import aiohttp from typing import List, Dict import requests import re from typing import List, Dict, Any, Tuple import random from threading import Thread from modules.webui import init_app from modules.storage import init_db, store_transaction app = Flask(__name__) # config = load_config() load_dotenv() load_dotenv('.env.secret') # Configuration DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") YOUR_WALLET = os.getenv("YOUR_WALLET") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') FOLLOW_AMOUNT = os.getenv('FOLLOW_AMOUNT', 'percentage') logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) #logging.basicConfig(level=logging.INFO) # Set up error logger log_dir = './logs' log_file = os.path.join(log_dir, 'error.log') os.makedirs(log_dir, exist_ok=True) error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) error_file_handler.setLevel(logging.ERROR) error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) error_file_handler.formatter.converter = time.localtime error_logger = logging.getLogger('error_logger') error_logger.setLevel(logging.ERROR) error_logger.addHandler(error_file_handler) # Set up success logger for accounting CSV class CSVFormatter(logging.Formatter): def __init__(self): super().__init__() self.output = None def format(self, record): if self.output is None: self.output = csv.writer(record.stream) self.output.writerow(['Timestamp', 'Token In', 'Token Out', 'Amount In', 'Amount Out', 'USD Value In', 'USD Value Out', 'Transaction Hash', 'Wallet Address']) self.output.writerow([ self.formatTime(record, self.datefmt), record.token_in, record.token_out, record.amount_in, record.amount_out, record.usd_value_in, record.usd_value_out, record.tx_hash, record.wallet_address ]) return '' def log_successful_swap(token_in, token_out, amount_in, amount_out, usd_value_in, usd_value_out, tx_hash, wallet_address): success_logger_accounting_csv.info('', extra={ 'token_in': token_in, 'token_out': token_out, 'amount_in': amount_in, 'amount_out': amount_out, 'usd_value_in': usd_value_in, 'usd_value_out': usd_value_out, 'tx_hash': tx_hash, 'wallet_address': wallet_address }) success_log_file = os.path.join(log_dir, 'successful_swaps.csv') success_file_handler = RotatingFileHandler(success_log_file, maxBytes=10*1024*1024, backupCount=5) success_file_handler.setFormatter(CSVFormatter()) success_logger_accounting_csv = logging.getLogger('success_logger_accounting_csv') success_logger_accounting_csv.setLevel(logging.INFO) success_logger_accounting_csv.addHandler(success_file_handler) # Function to find the latest log file def get_latest_log_file(): log_dir = './logs' try: # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] # filter files mask log_20241005_004103_143116.json files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))) return os.path.join(log_dir, latest_file) except Exception as e: logging.error(f"Error fetching latest log file: {e}") return None # Flask route to retry processing the last log @app.route('/retry', methods=['GET']) @app.route('/retry-last-log', methods=['GET']) async def retry_last_log(): latest_log_file = get_latest_log_file() if not latest_log_file: return jsonify({"error": "No log files found"}), 404 try: logger.info(f"Processing latest log file: {latest_log_file}") with open(latest_log_file, 'r') as f: log = json.load(f) result = await process_log(log) return jsonify({ "file": latest_log_file, "status": "Log dump processed successfully", "result": result }), 200 except Exception as e: logging.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 #const webhookPath = `/tr/${followedWallet.toBase58()}/${logs.signature}`; @app.route('/tr//', methods=['GET', 'POST']) async def transaction_notified(wallet, tx_signature): try: logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") # Process the transaction # tr = await get_swap_transaction_details(tx_signature) tr = await get_transaction_details_info(tx_signature, []) get_token_metadata_symbol(tr) # ToDo - probably optimize await follow_move(tr['token_in']) await follow_move(tr['token_out']) await save_token_info() return jsonify(tr), 200 except Exception as e: logging.error(f"Error processing transaction: {e}") return jsonify({"error": "Failed to process transaction"}), 500 # Configuration DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") YOUR_WALLET = os.getenv("YOUR_WALLET") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') # Use the production Solana RPC endpoint solana_client = AsyncClient(SOLANA_HTTP_URL) dexscreener_client = DexscreenerClient() # Initialize Telegram Bot bot = Bot(token=TELEGRAM_BOT_TOKEN) # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", } TOKENS_INFO = {} try: with open('./logs/token_info.json', 'r') as f: TOKENS_INFO = json.load(f) except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # if not telegram_utils.bot: try: asyncio.run(telegram_utils.initialize()) except Exception as e: logging.error(f"Error initializing Telegram bot: {str(e)}") # async def telegram_utils.send_telegram_message(message): # try: # await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) # logging.info(f"Telegram message sent: {message}") # # logging.info(f"Telegram message dummy sent: {message}") # except Exception as e: # logging.error(f"Error sending Telegram message: {str(e)}") # # # # # # # # # # DATABASE # # # # # # # # # # # # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # # # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # solana_client = AsyncClient(SOLANA_HTTP_URL) async def get_token_balance_rpc(wallet_address, token_address): url = SOLANA_HTTP_URL headers = {"Content-Type": "application/json"} data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountsByOwner", "params": [ wallet_address, { "mint": token_address }, { "encoding": "jsonParsed" } ] } try: response = requests.post(url, headers=headers, data=json.dumps(data)) response.raise_for_status() # Raises an error for bad responses accounts = response.json() if 'result' in accounts and accounts['result']['value']: first_account = accounts['result']['value'][0]['pubkey'] balance_data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountBalance", "params": [ first_account ] } balance_response = requests.post(url, headers=headers, data=json.dumps(balance_data)) balance_response.raise_for_status() balance = balance_response.json() if 'result' in balance and 'value' in balance['result']: amount = float(balance['result']['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No balance found for {token_address} in {wallet_address}") return 0 else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except requests.exceptions.RequestException as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 # # # solders/solana libs (solana_client) # # # from spl.token._layouts import MINT_LAYOUT from solana.rpc.api import Client, Pubkey from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from borsh_construct import String, CStruct METADATA_STRUCT = CStruct( "update_authority" / String, "mint" / String, "name" / String, "symbol" / String, "uri" / String, # ... other fields ... ) import struct def get_token_name_metadata(metadata_account_data): try: # Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint) offset = 1 + 32 + 32 # Read the name length (u32) name_length = struct.unpack(" update_authority # stream read less than specified amount, expected 2189641476, found 675 # # Parse metadata # key = data[:4] # update_authority = Pubkey(data[4:36]) # mint = Pubkey(data[36:68]) # name_length = struct.unpack(' 0 and data[0] == 4: # name_len = int.from_bytes(data[1:5], byteorder="little") # name = data[5:5+name_len].decode("utf-8").strip("\x00") # symbol_len = int.from_bytes(data[5+name_len:9+name_len], byteorder="little") # symbol = data[9+name_len:9+name_len+symbol_len].decode("utf-8").strip("\x00") # return {"name": name, "symbol": symbol} except Exception as e: logging.error(f"Error fetching token metadata for {mint_address}: {str(e)}") return None async def get_wallet_balances(wallet_address, doGetTokenName=True): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") global TOKENS_INFO try: response = await solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") ), commitment=Confirmed ) if response.value: for account in response.value: try: parsed_data = account.account.data.parsed if isinstance(parsed_data, dict) and 'info' in parsed_data: info = parsed_data['info'] if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: mint = info['mint'] decimals = info['tokenAmount']['decimals'] amount = float(info['tokenAmount']['amount'])/10**decimals if amount > 0: if mint in TOKENS_INFO: token_name = TOKENS_INFO[mint].get('symbol') elif doGetTokenName: token_name = await get_token_metadata_symbol(mint) or 'N/A' # sleep for 1 second to avoid rate limiting await asyncio.sleep(2) TOKENS_INFO[mint]['holdedAmount'] = round(amount, decimals) TOKENS_INFO[mint]['decimals'] = decimals balances[mint] = { 'name': token_name or 'N/A', 'address': mint, 'amount': amount, 'decimals': decimals } # sleep for 1 second to avoid rate limiting logging.debug(f"Account balance for {token_name} ({mint}): {amount}") else: logging.warning(f"Unexpected data format for account: {account}") except Exception as e: logging.error(f"Error parsing account data: {str(e)}") sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: balances['SOL'] = { 'name': 'SOL', 'address': 'SOL', 'amount': sol_balance.value / 1e9 } else: logging.warning(f"SOL balance response missing for wallet: {wallet_address}") except Exception as e: logging.error(f"Error getting wallet balances: {str(e)}") logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") return balances async def convert_balances_to_currency(balances , sol_price): converted_balances = {} for address, info in balances.items(): converted_balance = info.copy() # Create a copy of the original info if info['name'] == 'SOL': converted_balance['value'] = info['amount'] * sol_price elif address in TOKEN_PRICES: converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] else: converted_balance['value'] = None # Price not available logging.warning(f"Price not available for token {info['name']} ({address})") converted_balances[address] = converted_balance return converted_balances async def get_swap_transaction_details(tx_signature_str): t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) try: parsed_result = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } instructions = t.value.transaction.transaction.message.instructions # Parse the swap instruction to extract token addresses, amounts, and types for instruction in instructions: if isinstance(instruction, CompiledInstruction): if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"): parsed_info = instruction.parsed.info mint = parsed_info["mint"] amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"]) # Determine token in and token out based on balances if parsed_result["token_in"] is None and amount > 0: parsed_result["token_in"] = mint parsed_result["amount_in"] = amount elif parsed_result["token_out"] is None: parsed_result["token_out"] = mint parsed_result["amount_out"] = amount # Calculate percentage swapped if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 return parsed_result except Exception as e: logging.error(f"Error fetching transaction details: {e}") return None # # # # # # # # # # Functionality # # # # # # # # # # def safe_get_property(info, property_name, default='Unknown'): if not isinstance(info, dict): return str(default) value = info.get(property_name, default) return str(value) if value is not None else str(default) async def save_token_info(): with open('./logs/token_info.json', 'w') as f: json.dump(TOKENS_INFO, f, indent=2) async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, max_retries = 16): # wait for the transaction to be confirmed # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) # query every 5 seconds for the transaction details until not None or 30 seconds for _ in range(max_retries): try: tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: logging.error(f"Error fetching transaction details: {e}") retry_delay = retry_delay * 1.2 logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}. retry in {retry_delay} s.") await asyncio.sleep(retry_delay) retry_delay *= 1.2 return tx_details async def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logging.error(f"Error saving RPC log: {e}") PROCESSING_LOG = False async def process_log(log_result): global PROCESSING_LOG tr_details = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } if log_result['value']['err']: return logs = log_result['value']['logs'] try: # Detect swap operations in logs PROCESSING_LOG = True swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2'] if any(op in logs for op in swap_operations): # Save the log to a file await save_log(log_result) tx_signature_str = log_result['value']['signature'] before_source_balance = 0 source_token_change = 0 i = 0 while i < len(logs): log_entry = logs[i] # Check if we found the 'order_id' if tr_details["order_id"] is None and "order_id" in log_entry: # Extract the order_id tr_details["order_id"] = log_entry.split(":")[-1].strip() tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() # Look for the token change amounts after tokens have been found if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals elif "destination_token_change" in part: tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals i += 1 # calculate percentage swapped by digging before_source_balance, source_token_change and after_source_balance # "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752", # "Program log: after_source_balance: 0, after_destination_balance: 770570049", # "Program log: source_token_change: 19471871, destination_token_change: 770570049", if "before_source_balance" in log_entry: parts = log_entry.split(", ") for part in parts: if "before_source_balance" in part: before_source_balance = float(part.split(":")[-1].strip()) / 10 ** 6 if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6 # GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS try: if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") tr_details = await get_transaction_details_info(tx_signature_str, logs) # onlt needed if no details got if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 #dirty fix for percentage > 100 (decimals 9 but expecting 6) if tr_details["percentage_swapped"] > 100: tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000 # update token info: ToDo: check, but already did # all_token_addresses = list(set([tr_details["token_in"], tr_details["token_out"]])) # await get_token_prices(all_token_addresses) try: token_in = TOKENS_INFO[tr_details["token_in"]] token_out = TOKENS_INFO[tr_details["token_out"]] tr_details["symbol_in"] = token_in.get('symbol') tr_details["symbol_out"] = token_out.get('symbol') tr_details['amount_in_USD'] = tr_details['amount_in'] * token_in.get('price', 0) tr_details['amount_out_USD'] = tr_details['amount_out'] * token_out.get('price', 0) except Exception as e: logging.error(f"Error fetching token prices: {e}") message_text = ( f"Swap detected: \n" f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " # ({tr_details['token_in']}) ({tr_details['token_out']}) f"{tr_details['symbol_out']} \n" ) await telegram_utils.send_telegram_message(message_text) await follow_move(tr_details) await save_token_info() except Exception as e: logging.error(f"Error aquiring log details and following: {e}") await send_telegram_message(f"Not followed! Error following move.") except Exception as e: logging.error(f"Error processing log: {e}") PROCESSING_LOG = False return tr_details # "Program log: Instruction: Swap2", # "Program log: order_id: 13985890735038016", # "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source # "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: global TOKENS_INFO tr_info = await get_transaction_details_with_retry(tx_signature_str) # Fetch token prices: ToDo: check, but already did # token_prices = await get_token_prices([tr_info['token_in'], tr_info['token_out']]) # # for token, price in token_prices.items(): # # if not token in TOKENS_INFO or not TOKENS_INFO[token].get('symbol'): # # token_name = await get_token_metadata_symbol(token) # # TOKENS_INFO[token] = {'symbol': token_name} # # TOKENS_INFO[token] = {'price': price} # # Calculate USD values # tr_info['amount_in_USD'] = tr_info['amount_in'] * token_prices.get(tr_info['token_in'], 0) # tr_info['amount_out_USD'] = tr_info['amount_out'] * token_prices.get(tr_info['token_out'], 0) # Calculate the percentage of the source balance that was swapped; ToDo: fix decimals for percentage try: tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50 except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return tr_info def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) for balance in pre_balances: if balance['mint'] == token: return float(balance['uiTokenAmount']['amount']) return 0.0 async def follow_move(move): your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) if your_balance_info is not None: # Use the balance print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") else: print(f"No ballance found for {move['symbol_in']}. Skipping move.") await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") return your_balance = your_balance_info['amount'] token_info = TOKENS_INFO.get(move['token_in']) token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in']) token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out']) if not your_balance: msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." logging.warning(msg) await telegram_utils.send_telegram_message(msg) return if FOLLOW_AMOUNT == 'percentage': # Calculate the amount to swap based on the same percentage as the followed move amount_to_swap = your_balance * (move['percentage_swapped'] / 100) elif FOLLOW_AMOUNT == 'exact': amount_to_swap = move['amount_in'] else: try: fixed_amount = float(FOLLOW_AMOUNT) amount_to_swap = min(fixed_amount, your_balance) except ValueError: msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number." logging.warning(msg) await send_telegram_message(msg) return amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have amount = int(amount) logging.debug(f"Calculated amount in lamports: {amount}") decimals = token_info.get('decimals') # Convert to lamports # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9 amount = int(amount_to_swap * 10**decimals) if your_balance < amount_to_swap: # should not happen msg = ( f"Warning:\n" f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." ) logging.warning(msg) await telegram_utils.send_telegram_message(msg) try: try: notification = ( f"Initiating move:\n" f"Swapping {move['percentage_swapped']:.2f}% ({amount_to_swap:.2f}) {token_name_in} for {token_name_out}" ) # logging.info(notification) # error_logger.info(notification) # await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") for retry in range(3): try: private_key = Keypair.from_bytes(base58.b58decode(pk)) async_client = AsyncClient(SOLANA_WS_URL) jupiter = Jupiter(async_client, private_key) transaction_data = await jupiter.swap( input_mint=move['token_in'], output_mint=move['token_out'], amount=amount, slippage_bps=300, # Increased to 3% ) logging.info(f"Initiating move. Transaction data:\n {transaction_data}") error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) # send the transaction result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) transaction_id = json.loads(result.to_json())['result'] print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}") # append to notification notification += f"\n\nTransaction: {transaction_id}" await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}") tx_details = await get_transaction_details_with_retry(transaction_id) if tx_details is not None: break else: logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...") await asyncio.sleep(3) except Exception as e: error_message = f"Move Failed:\n{str(e)}\n{transaction_data}\n{move}" logging.error(error_message) # log the errors to /logs/errors.log error_logger.error(error_message) error_logger.exception(e) await telegram_utils.send_telegram_message(error_message) amount = amount * 0.75 await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) try: if tx_details is None: logging.info(f"Failed to get transaction details for {transaction_id}") notification = ( f"Move Followed, failed to get transaction details.\n" f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" f"\n\nTransaction: {transaction_id}" # log_successful_swap () ) else: notification = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" f"for {tx_details['amount_out']:.2f} {token_name_out}" # f"Amount In USD: {tr_details['amount_in_USD']}\n" f"\n\nTransaction: {transaction_id}" ) logging.info(notification) await send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") except Exception as e: error_message = f"Swap Follow Error:\n{str(e)}" logging.error(error_message) # log the errors to /logs/errors.log error_logger.error(error_message) error_logger.exception(e) \ # if error_message contains 'Program log: Error: insufficient funds' if 'insufficient funds' in error_message: await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") else: await telegram_utils.send_telegram_message(error_message) # Helper functions SOLANA_ENDPOINTS = [ "wss://api.mainnet-beta.solana.com", # "wss://solana-api.projectserum.com", # "wss://rpc.ankr.com/solana", # "wss://mainnet.rpcpool.com", ] PING_INTERVAL = 30 SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes # async def heartbeat(websocket): # while True: # try: # await websocket.ping() # await asyncio.sleep(PING_INTERVAL) # except websockets.exceptions.ConnectionClosed: # break _first_subscription = True _process_task = None async def wallet_watch_loop(): global _first_subscription, _process_task reconnect_delay = 5 max_reconnect_delay = 60 while True: try: try: subscription_id = None current_url = random.choice(SOLANA_ENDPOINTS) async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: logger.info(f"Connected to Solana websocket: {current_url}") # heartbeat_task = asyncio.create_task(heartbeat(websocket)) while True: if websocket.closed: break subscription_id = await subscribe(websocket) if subscription_id is not None: # await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") if _first_subscription: asyncio.create_task( list_initial_wallet_states()) _first_subscription = False _process_task = asyncio.create_task(process_messages(websocket)) while True: try:# drop subscription now await process_messages(websocket, subscription_id) # await asyncio.run(_process_task) # await asyncio.wait_for(_process_task, timeout=SUBSCRIBE_INTERVAL) except asyncio.TimeoutError: # Timeout occurred, time to resubscribe if not PROCESSING_LOG: _process_task.cancel() try: await _process_task except asyncio.CancelledError: pass await unsubscribe(websocket, subscription_id) new_sub_id = await subscribe(websocket) if new_sub_id is None: break if new_sub_id > 1: # we sometimes get True instead of integer, so we cje subscription_id = new_sub_id logger.info(f"New subscription created with ID: {subscription_id}") elif new_sub_id is True: # Already subscribed logger.info("Already subscribed, continuing with existing subscription") if subscription_id: process_task = asyncio.create_task(process_messages(websocket)) else: # process_messages completed (shouldn't happen unless there's an error) break else: send_telegram_message("Failed to connect. Retrying...") # heartbeat_task.cancel() except websockets.exceptions.WebSocketException as e: logger.error(f"WebSocket error: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") await unsubscribe(websocket, subscription_id) await send_telegram_message("reconnecting...") logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") await websocket.close() except Exception as e: logger.error(f"An unexpected error occurred - breaking watch loop: {e}") await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay) async def subscribe(websocket): request = { "jsonrpc": "2.0", "id": 1, "method": "logsSubscribe", "params": [ {"mentions": [FOLLOWED_WALLET]}, {"commitment": "confirmed"} ] } try: await websocket.send(json.dumps(request)) logger.info("Subscription request sent") return await process_messages(websocket) except Exception as e: logger.error(f"An unexpected error occurred: {e}") return None async def unsubscribe(websocket, subscription_id): if subscription_id: request = { "jsonrpc": "2.0", "id": 1, "method": "logsUnsubscribe", "params": [subscription_id] } await websocket.send(json.dumps(request)) logger.info(f"Unsubscribed from subscription id: {subscription_id}") subscription_id = None async def process_messages(websocket): try: while True: response = await websocket.recv() response_data = json.loads(response) logger.debug(f"Received response: {response_data}") if 'result' in response_data: new_sub_id = response_data['result'] if int(new_sub_id) > 1: subscription_id = new_sub_id logger.info(f"Subscription successful. New id: {subscription_id}") elif new_sub_id: logger.info(f"Existing subscription confirmed: {subscription_id}") else: return None return subscription_id elif 'params' in response_data: log = response_data['params']['result'] logger.debug(f"Received transaction log: {log}") asyncio.create_task(process_log(log)) else: logger.warning(f"Unexpected response: {response_data}") except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") # await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") pass except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") pk = os.getenv("PK") async def check_PK(): global pk if not pk: try: script_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(script_dir, 'secret.pk'), 'r') as f: pk = f.read().strip() if pk: logging.info("Private key loaded successfully from file.") else: logging.warning("Private key file is empty.") except FileNotFoundError: logging.warning("Private key file not found.") except Exception as e: logging.error(f"Error reading private key file: {str(e)}") if not pk: logging.error("Private key not found in environment variables. Will not be able to sign transactions.") # send TG warning message await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") # Convert Flask app to ASGI asgi_app = WsgiToAsgi(app) async def main(): global solanaAPI, bot, PROCESSING_LOG await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() # new: restart wallet_watch_loop every hour await solanaAPI.wallet_watch_loop() # while True: # wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop()) # try: # # Wait for an hour or until the task completes, whichever comes first # await asyncio.wait_for(wallet_watch_task, timeout=3600) # except asyncio.TimeoutError: # # If an hour has passed, cancel the task if not PROCESSING # if PROCESSING_LOG: # logging.info("wallet_watch_loop is processing logs. Will not restart.") # await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") # else: # wallet_watch_task.cancel() # try: # await wallet_watch_task # except asyncio.CancelledError: # logging.info("wallet_watch_loop was cancelled after running for an hour") # except Exception as e: # logging.error(f"Error in wallet_watch_loop: {str(e)}") # await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") # logging.info("Restarting wallet_watch_loop") # await telegram_utils.send_telegram_message("Restarting wallet_watch_loop") def run_asyncio_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def run_all(): main_task = asyncio.create_task(main()) await main_task if __name__ == '__main__': # Create a new event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # Start the asyncio loop in a separate thread thread = Thread(target=run_asyncio_loop, args=(loop,)) thread.start() # Schedule the run_all coroutine in the event loop asyncio.run_coroutine_threadsafe(run_all(), loop) # Run Uvicorn in the main thread uvicorn.run( "app:asgi_app", # Replace 'app' with the actual name of this Python file if different host="127.0.0.1", port=3001, log_level="debug", reload=True ) # When Uvicorn exits, stop the asyncio loop loop.call_soon_threadsafe(loop.stop) thread.join()