import asyncio import websockets import json from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.commitment import Confirmed from base58 import b58decode from solders.signature import Signature from solders.pubkey import Pubkey from solders.keypair import Keypair from solders.transaction import VersionedTransaction from solders.transaction import Transaction from solders.message import Message from solders.instruction import Instruction from solders.hash import Hash from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient from telegram import Bot from telegram.constants import ParseMode import datetime import logging import base64 import os from dotenv import load_dotenv,set_key import aiohttp from typing import List, Dict import requests import threading import re load_dotenv() app = Flask(__name__) # Function to find the latest log file def get_latest_log_file(): log_dir = './logs' try: # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] # filter files mask log_20241005_004103_143116.json files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x))) return os.path.join(log_dir, latest_file) except Exception as e: logging.error(f"Error fetching latest log file: {e}") return None # Flask route to retry processing the last log @app.route('/retry-last-log', methods=['GET']) def retry_last_log(): latest_log_file = get_latest_log_file() if not latest_log_file: return jsonify({"error": "No log files found"}), 404 try: with open(latest_log_file, 'r') as f: log = json.load(f) # Run the asynchronous process_log function asyncio.run(process_log(log)) return jsonify({"status": "Log processed successfully"}), 200 except Exception as e: logging.error(f"Error processing log: {e}") return jsonify({"error": "Failed to process log"}), 500 # Configuration DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") YOUR_WALLET = os.getenv("YOUR_WALLET") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') # Use the production Solana RPC endpoint solana_client = AsyncClient(SOLANA_HTTP_URL) dexscreener_client = DexscreenerClient() # Initialize Telegram Bot bot = Bot(token=TELEGRAM_BOT_TOKEN) # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", } async def send_telegram_message(message): try: await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") # async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: # url = "https://api.coingecko.com/api/v3/simple/token_price/solana" # params = { # "contract_addresses": ",".join(token_addresses), # "vs_currencies": DISPLAY_CURRENCY.lower() # } # prices = {} # async with aiohttp.ClientSession() as session: # async with session.get(url, params=params) as response: # if response.status == 200: # data = await response.json() # for address, price_info in data.items(): # if DISPLAY_CURRENCY.lower() in price_info: # prices[address] = price_info[DISPLAY_CURRENCY.lower()] # else: # logging.error(f"Failed to get token prices. Status: {response.status}") # # For tokens not found in CoinGecko, try to get price from a DEX or set a default value # missing_tokens = set(token_addresses) - set(prices.keys()) # for token in missing_tokens: # # You might want to implement a fallback method here, such as: # # prices[token] = await get_price_from_dex(token) # # For now, we'll set a default value # prices[token] = 0.0 # logging.warning(f"Price not found for token {token}. Setting to 0.") # return prices async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: coingecko_prices = await get_prices_from_coingecko(token_addresses) # For tokens not found in CoinGecko, use DexScreener missing_tokens = set(token_addresses) - set(coingecko_prices.keys()) if missing_tokens: dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) coingecko_prices.update(dexscreener_prices) # If any tokens are still missing, set their prices to 0 for token in set(token_addresses) - set(coingecko_prices.keys()): coingecko_prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") return coingecko_prices async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: url = "https://api.coingecko.com/api/v3/simple/token_price/solana" params = { "contract_addresses": ",".join(token_addresses), "vs_currencies": DISPLAY_CURRENCY.lower() } prices = {} async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() for address, price_info in data.items(): if DISPLAY_CURRENCY.lower() in price_info: prices[address] = price_info[DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}") return prices async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} async with aiohttp.ClientSession() as session: tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] results = await asyncio.gather(*tasks) for address, result in zip(token_addresses, results): if result and 'pairs' in result and result['pairs']: pair = result['pairs'][0] # Use the first pair (usually the most liquid) prices[address] = float(pair['priceUsd']) else: logging.warning(f"No price data found on DexScreener for token {address}") return prices async def fetch_token_data(session, url): try: async with session.get(url) as response: if response.status == 200: return await response.json() else: logging.error(f"Failed to fetch data from {url}. Status: {response.status}") return None except Exception as e: logging.error(f"Error fetching data from {url}: {str(e)}") return None async def get_sol_price() -> float: url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() return data['solana'][DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}") return await get_sol_price_from_dexscreener() async def get_sol_price_from_dexscreener() -> float: sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address prices = await get_prices_from_dexscreener([sol_address]) return prices.get(sol_address, 0.0) async def get_sol_price() -> float: url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() return data['solana'][DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get SOL price. Status: {response.status}") return None async def convert_balances_to_currency(balances, token_prices, sol_price): converted_balances = {} for token, amount in balances.items(): if token == 'SOL': converted_balances[token] = amount * sol_price elif token in token_prices: converted_balances[token] = amount * token_prices[token] else: converted_balances[token] = None # Price not available logging.warning(f"Price not available for token {token}") return converted_balances async def get_token_balance(wallet_address, token_address): try: response = await solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( mint=Pubkey.from_string(token_address), # program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") ), commitment=Confirmed ) if response['result']['value']: balance = await solana_client.get_token_account_balance( response['result']['value'][0]['pubkey'] ) amount = float(balance['result']['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except Exception as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 async def get_token_balance_rpc(wallet_address, token_address): url = SOLANA_HTTP_URL headers = {"Content-Type": "application/json"} data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountsByOwner", "params": [ wallet_address, { "mint": token_address }, { "encoding": "jsonParsed" } ] } try: response = requests.post(url, headers=headers, data=json.dumps(data)) response.raise_for_status() # Raises an error for bad responses accounts = response.json() if 'result' in accounts and accounts['result']['value']: first_account = accounts['result']['value'][0]['pubkey'] balance_data = { "jsonrpc": "2.0", "id": 1, "method": "getTokenAccountBalance", "params": [ first_account ] } balance_response = requests.post(url, headers=headers, data=json.dumps(balance_data)) balance_response.raise_for_status() balance = balance_response.json() if 'result' in balance and 'value' in balance['result']: amount = float(balance['result']['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No balance found for {token_address} in {wallet_address}") return 0 else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except requests.exceptions.RequestException as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 ENV_FILE = '.env' async def save_subscription_id(subscription_id): # storing subscription id in .env file disabled #set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id)) logger.info(f"Saved subscription ID: {subscription_id}") async def load_subscription_id(): subscription_id = os.getenv("SUBSCRIPTION_ID") return int(subscription_id) if subscription_id else None async def get_token_name(mint_address): try: token_info = await solana_client.get_token_supply(Pubkey.from_string(mint_address)) if token_info.value and 'symbol' in token_info.value: return token_info.value['symbol'] except Exception as e: logging.error(f"Error fetching token name for {mint_address}: {str(e)}") return None async def get_wallet_balances(wallet_address): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") try: response = await solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") ), commitment=Confirmed ) if response.value: for account in response.value: parsed_data = account.account.data.parsed if isinstance(parsed_data, dict) and 'info' in parsed_data: info = parsed_data['info'] if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: mint = info['mint'] amount = float(info['tokenAmount']['uiAmount']) if amount > 0: token_name = await get_token_name(mint) or mint balances[f"{token_name} ({mint})"] = amount logging.debug(f"Balance for {token_name} ({mint}): {amount}") else: logging.warning(f"Unexpected data format for account: {account}") sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: balances['SOL'] = sol_balance.value / 1e9 else: logging.warning(f"SOL balance response missing for wallet: {wallet_address}") except Exception as e: logging.error(f"Error getting wallet balances: {str(e)}") return balances async def list_initial_wallet_states(): global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances = await get_wallet_balances(YOUR_WALLET) all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys())) token_prices = await get_token_prices(all_token_addresses) sol_price = await get_sol_price() followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price) your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price) TOKEN_ADDRESSES = {token: amount for token, amount in {**followed_converted_balances, **your_converted_balances}.items() if amount is not None and amount > 0} logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}") followed_wallet_state = [] FOLLOWED_WALLET_VALUE = 0 for token, amount in followed_converted_balances.items(): if amount is not None and amount > 0: followed_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}") FOLLOWED_WALLET_VALUE += amount your_wallet_state = [] YOUR_WALLET_VALUE = 0 for token, amount in your_converted_balances.items(): if amount is not None and amount > 0: your_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}") YOUR_WALLET_VALUE += amount message = ( f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{chr(10).join(followed_wallet_state)}\n" f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{chr(10).join(your_wallet_state)}\n" f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"Monitored Tokens:\n" f"{', '.join(TOKEN_ADDRESSES.keys())}" ) logging.info(message) await send_telegram_message(message) async def get_transaction_details_rpc(tx_signature, readfromDump=False): url = SOLANA_HTTP_URL # url = 'https://solana.drpc.org' headers = {"Content-Type": "application/json"} data = { "jsonrpc": "2.0", "id": 1, "method": "getTransaction", "params": [ tx_signature, { "encoding": "jsonParsed", "maxSupportedTransactionVersion": 0 } ] } try: if readfromDump and os.path.exists('./logs/transation_details.json'): with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details transaction_details = json.load(f) return transaction_details else: response = requests.post(url, headers=headers, data=json.dumps(data)) response.raise_for_status() # Raises an error for bad responses transaction_details = response.json() with open('./logs/transation_details.json', 'w') as f: json.dump(transaction_details, f, indent=2) if 'result' in transaction_details: print(transaction_details['result']) return transaction_details['result'] else: print("Unexpected response:", transaction_details) except requests.exceptions.RequestException as e: print("Error fetching transaction details:", e) async def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logging.error(f"Error saving RPC log: {e}") async def process_log(log_result): if log_result['value']['err']: return tx_signature_str = log_result['value']['signature'] logs = log_result['value']['logs'] try: # Detect swap operations in logs swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2'] for log_entry in logs: if any(op in log_entry for op in swap_operations): try: details = await parse_swap_logs(logs) message_text = ( f"Swap detected:\n" f"Order ID: {details['order_id']}\n" f"Token In: {details['token_in']}\n" f"Token Out: {details['token_out']}\n" f"Amount In USD: {details['amount_in_USD']}\n" f"Percentage Swapped: {details['percentage_swapped']:.2f}%" ) await send_telegram_message(message_text) await follow_move(details) except Exception as e: logging.error(f"Error fetching transaction details: {e}") return except Exception as e: logging.error(f"Error processing log: {e}") # "Program log: Instruction: Swap2", # "Program log: order_id: 13985890735038016", # "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source # "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target # "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410", # "Program log: after_source_balance: 0, after_destination_balance: 472509072", # "Program log: source_token_change: 58730110139, destination_token_change: 270131294", async def parse_swap_logs(logs): token_in = None token_out = None amount_in = 0 amount_out_expected = 0 amount_out_actual = 0 order_id = None before_source_balance = 0 for log in logs: if "Program log:" in log: if "order_id:" in log: order_id = log.split("order_id: ")[-1].strip() elif "Swap2" in log: token_in = None token_out = None elif not token_in: token_in = log.split("Program log: ")[-1].strip() elif not token_out: token_out = log.split("Program log: ")[-1].strip() if "before_source_balance:" in log: before_source_balance = int(re.search(r"before_source_balance: (\d+)", log).group(1)) if "amount_in" in log or "amount_out" in log: amount_matches = re.findall(r"(amount_in|amount_out): (\d+)", log) for amount_type, value in amount_matches: if amount_type == "amount_in": amount_in = int(value) elif amount_type == "amount_out": amount_out_expected = int(value) elif "source_token_change:" in log or "destination_token_change:" in log: changes = log.split(", ") for change in changes: if "source_token_change" in change: amount_in = int(change.split(": ")[-1]) elif "destination_token_change" in change: amount_out_actual = int(change.split(": ")[-1]) token_prices = await get_token_prices([token_in, token_out]) amount_in_usd = amount_in / 1e6 * token_prices.get(token_in, 0) amount_out_usd = amount_out_actual / 1e6 * token_prices.get(token_out, 0) # Calculate the percentage of the source balance that was swapped percentage_swapped = (amount_in / before_source_balance) * 100 if before_source_balance > 0 else 0 return { "order_id": order_id, "token_in": token_in, "token_out": token_out, "amount_in": amount_in / 1e6, "amount_out_expected": amount_out_expected / 1e6, "amount_out_actual": amount_out_actual / 1e6, "amount_in_USD": amount_in_usd, "amount_out_USD": amount_out_usd, "percentage_swapped": percentage_swapped } async def follow_move(move): your_balances = await get_wallet_balances(YOUR_WALLET) your_balance = your_balances.get(move['token_in'], 0) # Calculate the amount to swap based on the same percentage as the followed move amount_to_swap = your_balance * (move['percentage_swapped'] / 100) if your_balance >= amount_to_swap: try: private_key = Keypair.from_bytes(base58.b58decode(os.getenv("PK"))) async_client = AsyncClient(SOLANA_WS_URL) jupiter = Jupiter(async_client, private_key) transaction_data = await jupiter.swap( input_mint=move['token_in'], output_mint=move['token_out'], amount=int(amount_to_swap * 1e6), # Convert to lamports slippage_bps=1, ) raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) transaction_id = json.loads(result.to_json())['result'] message = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {move['token_in']} " f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" f"for {transaction_data['outputAmount'] / 1e6:.6f} {move['token_out']}" ) logging.info(message) await send_telegram_message(message) except Exception as e: error_message = f"Swap Error:\n{str(e)}" logging.error(error_message) await send_telegram_message(error_message) else: message = ( f"Move Failed:\n" f"Insufficient balance to swap {amount_to_swap:.6f} {move['token_in']}" ) logging.warning(message) await send_telegram_message(message) # Helper functions (implement these according to your needs) async def on_logs(log): logging.debug(f"Received log: {log}") await save_log(log) await process_log(log) async def subscribe_to_wallet(): SOLANA_ENDPOINTS = [ "wss://api.mainnet-beta.solana.com", "wss://solana-api.projectserum.com", "wss://rpc.ankr.com/solana", "wss://mainnet.rpcpool.com", ] uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com reconnect_delay = 5 # Start with a 5-second delay max_reconnect_delay = 60 # Maximum delay of 60 seconds while True: try: async with websockets.connect(uri) as websocket: logger.info("Connected to Solana websocket") subscription_id = await load_subscription_id() request = { "jsonrpc": "2.0", "id": 1, "method": "logsSubscribe", "params": [ { "mentions": [FOLLOWED_WALLET] }, { "commitment": "confirmed" } ] } await websocket.send(json.dumps(request)) logger.info("Subscription request sent") while True: try: response = await websocket.recv() response_data = json.loads(response) logger.debug(f"Received response: {response_data}") if 'result' in response_data: subscription_id = response_data['result'] await save_subscription_id(subscription_id) logger.info(f"Subscription successful. Subscription id: {subscription_id}") await send_telegram_message("Connected to Solana network. Watching for transactions now.") await list_initial_wallet_states() elif 'params' in response_data: await on_logs(response_data['params']['result']) else: logger.warning(f"Unexpected response: {response}") except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") break except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") break except websockets.exceptions.WebSocketException as e: logger.error(f"WebSocket error: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") await asyncio.sleep(reconnect_delay) # Implement exponential backoff reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) logger = logging.getLogger(__name__) async def main(): # Initialize logging logging.basicConfig(level=logging.DEBUG) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await subscribe_to_wallet() def run_flask(): # Run Flask app without the reloader, so we can run the async main function app.run(debug=False, port=3001, use_reloader=False) if __name__ == '__main__': # Start Flask in a separate thread flask_thread = threading.Thread(target=run_flask) flask_thread.start() # Create an event loop for the async tasks loop = asyncio.get_event_loop() loop.run_until_complete(main()) # Start Flask in a separate thread flask_thread = threading.Thread(target=run_flask) flask_thread.start() # Run the async main function asyncio.run(main())