import asyncio import websockets import json from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.rpc.commitment import Confirmed from solders.pubkey import Pubkey from dexscreener import DexscreenerClient from telegram import Bot from telegram.constants import ParseMode import datetime import logging from solana.rpc.websocket_api import connect from solana.rpc.async_api import AsyncClient from solana.rpc.commitment import Confirmed import os from dotenv import load_dotenv load_dotenv() app = Flask(__name__) # Use the production Solana RPC endpoint solana_client = AsyncClient("https://api.mainnet-beta.solana.com") dexscreener_client = DexscreenerClient() # Configuration DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") YOUR_WALLET = os.getenv("YOUR_WALLET") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_URL = os.getenv("SOLANA_NET_URL") # Initialize Telegram Bot bot = Bot(token=TELEGRAM_BOT_TOKEN) # Initialize logging logging.basicConfig(level=logging.DEBUG) # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og" } async def send_telegram_message(message): try: await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") async def get_token_balance(wallet_address, token_address): try: response = await solana_client.get_token_accounts_by_owner( Pubkey.from_string(wallet_address), {"mint": Pubkey.from_string(token_address)} ) if response['result']['value']: balance = await solana_client.get_token_account_balance( response['result']['value'][0]['pubkey'] ) amount = float(balance['result']['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except Exception as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)}") return 0 async def get_wallet_balances(wallet_address): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") for token, address in TOKEN_ADDRESSES.items(): balance = await get_token_balance(wallet_address, address) balances[token] = balance logging.debug(f"Balance for {token}: {balance}") return balances async def get_non_zero_token_balances(wallet_address): non_zero_balances = {} logging.info(f"Getting non-zero balances for wallet: {wallet_address}") for token, address in TOKEN_ADDRESSES.items(): balance = await get_token_balance(wallet_address, address) if balance > 0: non_zero_balances[token] = address logging.debug(f"Non-zero balance for {token}: {balance}") return non_zero_balances async def list_initial_wallet_states(): global TOKEN_ADDRESSES followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances = await get_wallet_balances(YOUR_WALLET) followed_non_zero = await get_non_zero_token_balances(FOLLOWED_WALLET) your_non_zero = await get_non_zero_token_balances(YOUR_WALLET) TOKEN_ADDRESSES = {**followed_non_zero, **your_non_zero} logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}") followed_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in followed_wallet_balances.items() if amount > 0]) your_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in your_wallet_balances.items() if amount > 0]) message = ( "Initial Wallet States (Non-zero balances):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{followed_wallet_state}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{your_wallet_state}\n\n" f"Monitored Tokens:\n" f"{', '.join(TOKEN_ADDRESSES.keys())}" ) logging.info(message) await send_telegram_message(message) async def follow_move(move): followed_balances = await get_wallet_balances(FOLLOWED_WALLET) your_balances = await get_wallet_balances(YOUR_WALLET) if move['token'] not in followed_balances or move['token'] not in your_balances: logging.error(f"Invalid token: {move['token']}") return followed_balance = followed_balances[move['token']] your_balance = your_balances[move['token']] proportion = your_balance / followed_balance if followed_balance > 0 else 0 amount_to_swap = move['amount'] * proportion if your_balance >= amount_to_swap: # Implement actual swap logic here pair = dexscreener_client.get_token_pair("solana", move['token']) price = float(pair['priceUsd']) received_amount = amount_to_swap * price message = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {move['token']} " f"for {received_amount:.6f} {move['to_token']}" ) logging.info(message) await send_telegram_message(message) else: message = ( f"Move Failed:\n" f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}" ) logging.warning(message) await send_telegram_message(message) async def on_logs(logs): print(f"Received logs: {logs}") if logs.value.err: return tx = logs.value.signature tx_result = await solana_client.get_transaction(tx) if tx_result and tx_result.value: for instruction in tx_result.value.transaction.message.instructions: if instruction.program_id == Pubkey.from_string(TOKEN_ADDRESSES['SOL']): # This is a token transfer from_pubkey = instruction.accounts[0] to_pubkey = instruction.accounts[1] amount = instruction.data[8:] # The amount is stored in the last 8 bytes amount = int.from_bytes(amount, 'little') / 1e9 # Convert lamports to SOL if from_pubkey == Pubkey.from_string(FOLLOWED_WALLET): move = { 'token': 'SOL', 'amount': amount, 'to_token': 'Unknown' # You might want to determine this based on the receiving address } await follow_move(move) async def subscribe_to_wallet(): uri = SOLANA_URL async with websockets.connect(uri) as websocket: # Correct the `params` format to be an object request = { "jsonrpc": "2.0", "id": 1, "method": "logsSubscribe", "params": { "mentions": [YOUR_WALLET], "commitment": "confirmed" } } await websocket.send(json.dumps(request)) # Listen for messages while True: response = await websocket.recv() print(response) async def main(): logging.basicConfig(level=logging.INFO) await send_telegram_message("Solana Agent Application Started") await list_initial_wallet_states() await subscribe_to_wallet() if __name__ == '__main__': asyncio.run(main())