import asyncio import websockets import json from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.rpc.commitment import Confirmed from solders.pubkey import Pubkey from dexscreener import DexscreenerClient from telegram import Bot from telegram.constants import ParseMode import datetime import logging from solana.rpc.websocket_api import connect from solana.rpc.async_api import AsyncClient from solana.rpc.commitment import Confirmed from solana.rpc.types import TokenAccountOpts import base64 import os from dotenv import load_dotenv load_dotenv() app = Flask(__name__) # Use the production Solana RPC endpoint solana_client = AsyncClient("https://api.mainnet-beta.solana.com") dexscreener_client = DexscreenerClient() # Configuration DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") YOUR_WALLET = os.getenv("YOUR_WALLET") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_URL = os.getenv("SOLANA_NET_URL") # Initialize Telegram Bot bot = Bot(token=TELEGRAM_BOT_TOKEN) # Initialize logging logging.basicConfig(level=logging.DEBUG) # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og" } async def send_telegram_message(message): try: await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") async def get_token_balance(wallet_address, token_address): try: response = await solana_client.get_token_accounts_by_owner( Pubkey.from_string(wallet_address), {"mint": Pubkey.from_string(token_address)} ) if response['result']['value']: balance = await solana_client.get_token_account_balance( response['result']['value'][0]['pubkey'] ) amount = float(balance['result']['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except Exception as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)}") return 0 async def get_wallet_balances(wallet_address): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") opts = TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"), encoding="jsonParsed" # Ensure encoding is set to jsonParsed ) # Get all token accounts for the wallet response = await solana_client.get_token_accounts_by_owner( Pubkey.from_string(wallet_address), opts, commitment=Confirmed ) if response.value: for account in response.value: mint = account.account.data.parsed.info.mint balance_response = await solana_client.get_token_account_balance(account.pubkey) if balance_response.value: amount = float(balance_response.value.uiAmount) balances[mint] = amount logging.debug(f"Balance for {mint}: {amount}") sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value: balances['SOL'] = sol_balance.value / 1e9 return balances async def get_non_zero_token_balances(wallet_address): non_zero_balances = {} logging.info(f"Getting non-zero balances for wallet: {wallet_address}") for token, address in TOKEN_ADDRESSES.items(): balance = await get_token_balance(wallet_address, address) if balance > 0: non_zero_balances[token] = address logging.debug(f"Non-zero balance for {token}: {balance}") return non_zero_balances async def list_initial_wallet_states(): global TOKEN_ADDRESSES followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances = await get_wallet_balances(YOUR_WALLET) followed_non_zero = await get_non_zero_token_balances(FOLLOWED_WALLET) your_non_zero = await get_non_zero_token_balances(YOUR_WALLET) TOKEN_ADDRESSES = {**followed_non_zero, **your_non_zero} logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}") followed_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in followed_wallet_balances.items() if amount > 0]) your_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in your_wallet_balances.items() if amount > 0]) message = ( "Initial Wallet States (Non-zero balances):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{followed_wallet_state}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{your_wallet_state}\n\n" f"Monitored Tokens:\n" f"{', '.join(TOKEN_ADDRESSES.keys())}" ) logging.info(message) await send_telegram_message(message) async def follow_move(move): followed_balances = await get_wallet_balances(FOLLOWED_WALLET) your_balances = await get_wallet_balances(YOUR_WALLET) if move['token'] not in followed_balances or move['token'] not in your_balances: logging.error(f"Invalid token: {move['token']}") return followed_balance = followed_balances[move['token']] your_balance = your_balances[move['token']] proportion = your_balance / followed_balance if followed_balance > 0 else 0 amount_to_swap = move['amount'] * proportion if your_balance >= amount_to_swap: # Implement actual swap logic here pair = dexscreener_client.get_token_pair("solana", move['token']) price = float(pair['priceUsd']) received_amount = amount_to_swap * price message = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {move['token']} " f"for {received_amount:.6f} {move['to_token']}" ) logging.info(message) await send_telegram_message(message) else: message = ( f"Move Failed:\n" f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}" ) logging.warning(message) await send_telegram_message(message) async def on_logs(log): print(f"Received log: {log}") if 'err' in log and log['err']: return tx = log['signature'] tx_result = await solana_client.get_transaction(tx) if tx_result and 'result' in tx_result and tx_result['result']: transaction = tx_result['result']['transaction'] message = transaction['message'] for instruction in message['instructions']: if instruction['programId'] == TOKEN_ADDRESSES['SOL']: # This is a token transfer from_pubkey = instruction['accounts'][0] to_pubkey = instruction['accounts'][1] amount = int(instruction['data'], 16) / 1e9 # Convert lamports to SOL if from_pubkey == FOLLOWED_WALLET: move = { 'token': 'SOL', 'amount': amount, 'to_token': 'Unknown' # You might want to determine this based on the receiving address } await follow_move(move) async def subscribe_to_wallet(): uri = SOLANA_URL async with websockets.connect(uri) as websocket: # Correct the `params` format to be an array request = { "jsonrpc": "2.0", "id": 1, "method": "logsSubscribe", "params": [ { "mentions": [FOLLOWED_WALLET] # Changed from YOUR_WALLET to FOLLOWED_WALLET }, { "commitment": "confirmed" } ] } await websocket.send(json.dumps(request)) # Listen for messages while True: response = await websocket.recv() response_data = json.loads(response) if 'result' in response_data: print(f"Subscription successful. Subscription id: {response_data['result']}") elif 'params' in response_data: await on_logs(response_data['params']['result']) else: print(f"Unexpected response: {response}") async def main(): logging.basicConfig(level=logging.INFO) await send_telegram_message("Solana Agent Application Started") await list_initial_wallet_states() await subscribe_to_wallet() if __name__ == '__main__': asyncio.run(main())