import asyncio import websockets import json from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.rpc.commitment import Confirmed from solders.pubkey import Pubkey from dexscreener import DexscreenerClient from telegram import Bot from telegram.constants import ParseMode import datetime import logging from solana.rpc.websocket_api import connect from solana.rpc.async_api import AsyncClient from solana.rpc.commitment import Confirmed from solana.rpc.types import TokenAccountOpts import base64 import os from dotenv import load_dotenv import aiohttp from typing import List, Dict load_dotenv() app = Flask(__name__) # Use the production Solana RPC endpoint solana_client = AsyncClient("https://api.mainnet-beta.solana.com") dexscreener_client = DexscreenerClient() # Configuration DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") YOUR_WALLET = os.getenv("YOUR_WALLET") TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_URL = os.getenv("SOLANA_NET_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') # Initialize Telegram Bot bot = Bot(token=TELEGRAM_BOT_TOKEN) # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og" } async def send_telegram_message(message): try: await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") # async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: # url = "https://api.coingecko.com/api/v3/simple/token_price/solana" # params = { # "contract_addresses": ",".join(token_addresses), # "vs_currencies": DISPLAY_CURRENCY.lower() # } # prices = {} # async with aiohttp.ClientSession() as session: # async with session.get(url, params=params) as response: # if response.status == 200: # data = await response.json() # for address, price_info in data.items(): # if DISPLAY_CURRENCY.lower() in price_info: # prices[address] = price_info[DISPLAY_CURRENCY.lower()] # else: # logging.error(f"Failed to get token prices. Status: {response.status}") # # For tokens not found in CoinGecko, try to get price from a DEX or set a default value # missing_tokens = set(token_addresses) - set(prices.keys()) # for token in missing_tokens: # # You might want to implement a fallback method here, such as: # # prices[token] = await get_price_from_dex(token) # # For now, we'll set a default value # prices[token] = 0.0 # logging.warning(f"Price not found for token {token}. Setting to 0.") # return prices async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: coingecko_prices = await get_prices_from_coingecko(token_addresses) # For tokens not found in CoinGecko, use DexScreener missing_tokens = set(token_addresses) - set(coingecko_prices.keys()) if missing_tokens: dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) coingecko_prices.update(dexscreener_prices) # If any tokens are still missing, set their prices to 0 for token in set(token_addresses) - set(coingecko_prices.keys()): coingecko_prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") return coingecko_prices async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: url = "https://api.coingecko.com/api/v3/simple/token_price/solana" params = { "contract_addresses": ",".join(token_addresses), "vs_currencies": DISPLAY_CURRENCY.lower() } prices = {} async with aiohttp.ClientSession() as session: async with session.get(url, params=params) as response: if response.status == 200: data = await response.json() for address, price_info in data.items(): if DISPLAY_CURRENCY.lower() in price_info: prices[address] = price_info[DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}") return prices async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} async with aiohttp.ClientSession() as session: tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] results = await asyncio.gather(*tasks) for address, result in zip(token_addresses, results): if result and 'pairs' in result and result['pairs']: pair = result['pairs'][0] # Use the first pair (usually the most liquid) prices[address] = float(pair['priceUsd']) else: logging.warning(f"No price data found on DexScreener for token {address}") return prices async def fetch_token_data(session, url): try: async with session.get(url) as response: if response.status == 200: return await response.json() else: logging.error(f"Failed to fetch data from {url}. Status: {response.status}") return None except Exception as e: logging.error(f"Error fetching data from {url}: {str(e)}") return None async def get_sol_price() -> float: url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() return data['solana'][DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}") return await get_sol_price_from_dexscreener() async def get_sol_price_from_dexscreener() -> float: sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address prices = await get_prices_from_dexscreener([sol_address]) return prices.get(sol_address, 0.0) async def get_sol_price() -> float: url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() return data['solana'][DISPLAY_CURRENCY.lower()] else: logging.error(f"Failed to get SOL price. Status: {response.status}") return None async def convert_balances_to_currency(balances, token_prices, sol_price): converted_balances = {} for token, amount in balances.items(): if token == 'SOL': converted_balances[token] = amount * sol_price elif token in token_prices: converted_balances[token] = amount * token_prices[token] else: converted_balances[token] = None # Price not available logging.warning(f"Price not available for token {token}") return converted_balances async def get_token_balance(wallet_address, token_address): try: response = await solana_client.get_token_accounts_by_owner( Pubkey.from_string(wallet_address), {"mint": Pubkey.from_string(token_address)} ) if response['result']['value']: balance = await solana_client.get_token_account_balance( response['result']['value'][0]['pubkey'] ) amount = float(balance['result']['value']['uiAmount']) logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") return amount else: logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except Exception as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)}") return 0 class SolanaEncoder(json.JSONEncoder): def default(self, obj): if hasattr(obj, '__dict__'): return obj.__dict__ return str(obj) async def get_wallet_balances(wallet_address): balances = {} token_addresses = [] logging.info(f"Getting balances for wallet: {wallet_address}") try: response = await solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") ), commitment=Confirmed ) if response.value: for account in response.value: parsed_data = account.account.data.parsed if isinstance(parsed_data, dict) and 'info' in parsed_data: info = parsed_data['info'] if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: mint = info['mint'] amount = float(info['tokenAmount']['uiAmount']) if amount > 0: token_addresses.append(mint) balances[mint] = amount logging.debug(f"Balance for {mint}: {amount}") else: logging.warning(f"Unexpected data format for account: {account}") sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: balances['SOL'] = sol_balance.value / 1e9 else: logging.warning(f"SOL balance response missing for wallet: {wallet_address}") except Exception as e: logging.error(f"Error getting wallet balances: {str(e)}") return balances, token_addresses async def get_converted_balances(wallet_address): balances, token_addresses = await get_wallet_balances(wallet_address) token_prices = await get_token_prices(token_addresses) sol_price = await get_sol_price() converted_balances = await convert_balances_to_currency(balances, token_prices, sol_price) return converted_balances async def send_initial_wallet_states(followed_wallet, your_wallet): followed_balances = await get_converted_balances(followed_wallet) your_balances = await get_converted_balances(your_wallet) message = f"Initial Wallet States (Non-zero balances in {DISPLAY_CURRENCY}):\n\n" message += f"Followed Wallet ({followed_wallet}):\n" for token, amount in followed_balances.items(): if amount and amount > 0: message += f"{token}: {amount:.2f}\n" message += f"\nYour Wallet ({your_wallet}):\n" for token, amount in your_balances.items(): if amount and amount > 0: message += f"{token}: {amount:.2f}\n" message += "\nMonitored Tokens:\n" # Add monitored tokens logic here if needed await bot.send_message(chat_id=CHAT_ID, text=message) async def get_non_zero_token_balances(wallet_address): non_zero_balances = {} logging.info(f"Getting non-zero balances for wallet: {wallet_address}") for token, address in TOKEN_ADDRESSES.items(): balance = await get_token_balance(wallet_address, address) if balance > 0: non_zero_balances[token] = address logging.debug(f"Non-zero balance for {token}: {balance}") return non_zero_balances async def list_initial_wallet_states(): global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE followed_wallet_balances, followed_token_addresses = await get_wallet_balances(FOLLOWED_WALLET) your_wallet_balances, your_token_addresses = await get_wallet_balances(YOUR_WALLET) all_token_addresses = list(set(followed_token_addresses + your_token_addresses)) token_prices = await get_token_prices(all_token_addresses) sol_price = await get_sol_price() followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price) your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price) TOKEN_ADDRESSES = {token: amount for token, amount in {**followed_converted_balances, **your_converted_balances}.items() if amount is not None and amount > 0} logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}") followed_wallet_state = [] FOLLOWED_WALLET_VALUE = 0 for token, amount in followed_converted_balances.items(): if amount is not None and amount > 0: followed_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}") FOLLOWED_WALLET_VALUE += amount your_wallet_state = [] YOUR_WALLET_VALUE = 0 for token, amount in your_converted_balances.items(): if amount is not None and amount > 0: your_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}") YOUR_WALLET_VALUE += amount message = ( f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{chr(10).join(followed_wallet_state)}\n" f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{chr(10).join(your_wallet_state)}\n" f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"Monitored Tokens:\n" f"{', '.join(TOKEN_ADDRESSES.keys())}" ) logging.info(message) await send_telegram_message(message) async def follow_move(move): followed_balances = await get_wallet_balances(FOLLOWED_WALLET) your_balances = await get_wallet_balances(YOUR_WALLET) if move['token'] not in followed_balances or move['token'] not in your_balances: logging.error(f"Invalid token: {move['token']}") return followed_balance = followed_balances[move['token']] your_balance = your_balances[move['token']] proportion = your_balance / followed_balance if followed_balance > 0 else 0 amount_to_swap = move['amount'] * proportion if your_balance >= amount_to_swap: # Implement actual swap logic here pair = dexscreener_client.get_token_pair("solana", move['token']) price = float(pair['priceUsd']) received_amount = amount_to_swap * price message = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {move['token']} " f"for {received_amount:.6f} {move['to_token']}" ) logging.info(message) await send_telegram_message(message) else: message = ( f"Move Failed:\n" f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}" ) logging.warning(message) await send_telegram_message(message) followed_balances = await get_wallet_balances(FOLLOWED_WALLET) your_balances = await get_wallet_balances(YOUR_WALLET) if move['token'] not in followed_balances or move['token'] not in your_balances: logging.error(f"Invalid token: {move['token']}") return followed_balance = followed_balances[move['token']] your_balance = your_balances[move['token']] proportion = your_balance / followed_balance if followed_balance > 0 else 0 amount_to_swap = move['amount'] * proportion if your_balance >= amount_to_swap: # Implement actual swap logic here pair = dexscreener_client.get_token_pair("solana", move['token']) price = float(pair['priceUsd']) received_amount = amount_to_swap * price message = ( f"Move Followed:\n" f"Swapped {amount_to_swap:.6f} {move['token']} " f"for {received_amount:.6f} {move['to_token']}" ) logging.info(message) await send_telegram_message(message) else: message = ( f"Move Failed:\n" f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}" ) logging.warning(message) await send_telegram_message(message) async def on_logs(log): print(f"Received log: {log}") try: if 'err' in log and log['err']: return if 'value' in log and 'logs' in log['value']: tx = log['value']['signature'] logs = log['value']['logs'] # Fetch transaction details tx_result = await solana_client.get_transaction(tx) if tx_result and 'result' in tx_result and tx_result['result']: transaction = tx_result['result']['transaction'] message = transaction['message'] for log_entry in logs: if 'Program log: Instruction: Swap' in log_entry: # Handle swap event for instruction in message['instructions']: if instruction['programId'] == TOKEN_ADDRESSES['SOL']: # This is a token transfer from_pubkey = instruction['accounts'][0] to_pubkey = instruction['accounts'][1] amount = int(instruction['data'], 16) / 1e9 # Convert lamports to SOL if from_pubkey == FOLLOWED_WALLET: move = { 'token': 'SOL', 'amount': amount, 'to_token': 'Unknown' # You might want to determine this based on the receiving address } await follow_move(move) # Send a Telegram message about the swap message_text = f"Swap detected:\nFrom: {from_pubkey}\nTo: {to_pubkey}\nAmount: {amount} SOL" await send_telegram_message(message_text) else: print(f"Unexpected log format: {log}") except: print(f"error processing RPC log") async def subscribe_to_wallet(): uri = SOLANA_URL reconnect_delay = 5 # Start with a 5-second delay max_reconnect_delay = 60 # Maximum delay of 60 seconds while True: try: async with websockets.connect(uri) as websocket: logger.info("Connected to Solana websocket") request = { "jsonrpc": "2.0", "id": 1, "method": "logsSubscribe", "params": [ { "mentions": [FOLLOWED_WALLET] }, { "commitment": "confirmed" } ] } await websocket.send(json.dumps(request)) logger.info("Subscription request sent") while True: try: response = await websocket.recv() response_data = json.loads(response) if 'result' in response_data: logger.info(f"Subscription successful. Subscription id: {response_data['result']}") elif 'params' in response_data: await on_logs(response_data['params']['result']) else: logger.warning(f"Unexpected response: {response}") except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") break except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") break except websockets.exceptions.WebSocketException as e: logger.error(f"WebSocket error: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") await asyncio.sleep(reconnect_delay) # Implement exponential backoff reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) logger = logging.getLogger(__name__) async def main(): # Initialize logging logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.INFO) await send_telegram_message("Solana Agent Application Started") await list_initial_wallet_states() await subscribe_to_wallet() if __name__ == '__main__': asyncio.run(main())