import asyncio import uvicorn from asgiref.wsgi import WsgiToAsgi import websockets import json import datetime import os from dotenv import load_dotenv from threading import Thread from solana.rpc.async_api import AsyncClient from solders.transaction import VersionedTransaction from solana.rpc.types import TxOpts from solders.keypair import Keypair from jupiter_python_sdk.jupiter import Jupiter from solana.rpc.commitment import Processed from modules.webui import init_app from modules.storage import init_db, store_transaction from modules.utils import telegram_utils, logging, get_pk from modules.log_processor import watch_for_new_logs from modules.SolanaAPI import SAPI # config = load_config() load_dotenv() load_dotenv('.env.secret') # Configuration from config import (DO_WATCH_WALLET, logging, logger) # # # # # # # # # # TELEGRAM # # # # # # # # # # # if not telegram_utils.bot: # try: # asyncio.run(telegram_utils.initialize()) # except Exception as e: # logging.error(f"Error initializing Telegram bot: {str(e)}") # async def telegram_utils.send_telegram_message(message): # try: # await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) # logging.info(f"Telegram message sent: {message}") # # logging.info(f"Telegram message dummy sent: {message}") # except Exception as e: # logging.error(f"Error sending Telegram message: {str(e)}") # # # # # # # # # # DATABASE # # # # # # # # # # # # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # # # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # # # # # # # # # # # Functionality # # # # # # # # # # async def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logging.error(f"Error saving RPC log: {e}") PROCESSING_LOG = False async def process_log(log_result): global PROCESSING_LOG tr_details = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } if log_result['value']['err']: return logs = log_result['value']['logs'] try: # Detect swap operations in logs PROCESSING_LOG = True swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2'] if any(op in logs for op in swap_operations): # Save the log to a file await save_log(log_result) tx_signature_str = log_result['value']['signature'] before_source_balance = 0 source_token_change = 0 i = 0 while i < len(logs): log_entry = logs[i] # Check if we found the 'order_id' if tr_details["order_id"] is None and "order_id" in log_entry: # Extract the order_id tr_details["order_id"] = log_entry.split(":")[-1].strip() tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() # Look for the token change amounts after tokens have been found if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals elif "destination_token_change" in part: tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals i += 1 # calculate percentage swapped by digging before_source_balance, source_token_change and after_source_balance # "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752", # "Program log: after_source_balance: 0, after_destination_balance: 770570049", # "Program log: source_token_change: 19471871, destination_token_change: 770570049", if "before_source_balance" in log_entry: parts = log_entry.split(", ") for part in parts: if "before_source_balance" in part: before_source_balance = float(part.split(":")[-1].strip()) / 10 ** 6 if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6 # GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS try: if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") tr_details = await SAPI.get_transaction_details_info(tx_signature_str, logs) # onlt needed if no details got if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 #dirty fix for percentage > 100 (decimals 9 but expecting 6) if tr_details["percentage_swapped"] > 100: tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000 # update token info: ToDo: check, but already did # all_token_addresses = list(set([tr_details["token_in"], tr_details["token_out"]])) # await get_token_prices(all_token_addresses) try: token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]] token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]] tr_details["symbol_in"] = token_in.get('symbol') tr_details["symbol_out"] = token_out.get('symbol') tr_details['amount_in_USD'] = tr_details['amount_in'] * token_in.get('price', 0) tr_details['amount_out_USD'] = tr_details['amount_out'] * token_out.get('price', 0) except Exception as e: logging.error(f"Error fetching token prices: {e}") message_text = ( f"Swap detected: \n" f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " # ({tr_details['token_in']}) ({tr_details['token_out']}) f"{tr_details['symbol_out']} \n" ) await telegram_utils.send_telegram_message(message_text) await SAPI.follow_move(tr_details) await SAPI.save_token_info() except Exception as e: logging.error(f"Error aquiring log details and following: {e}") await telegram_utils.send_telegram_message(f"Not followed! Error following move.") except Exception as e: logging.error(f"Error processing log: {e}") PROCESSING_LOG = False return tr_details # async def follow_move_legacy(move): # global pk # if pk is None: # pk = await get_pk() # your_balances = await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) # your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) # if your_balance_info is not None: # # Use the balance # print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") # else: # print(f"No ballance found for {move['symbol_in']}. Skipping move.") # await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") # return # your_balance = your_balance_info['amount'] # token_info = SAPI.dex.TOKENS_INFO.get(move['token_in']) # token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata(move['token_in']) # token_name_out = SAPI.dex.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out']) # if not your_balance: # msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." # logging.warning(msg) # await telegram_utils.send_telegram_message(msg) # return # if FOLLOW_AMOUNT == 'percentage': # # Calculate the amount to swap based on the same percentage as the followed move # amount_to_swap = your_balance * (move['percentage_swapped'] / 100) # elif FOLLOW_AMOUNT == 'exact': # amount_to_swap = move['amount_in'] # else: # try: # fixed_amount = float(FOLLOW_AMOUNT) # un USD # fixed_amount_in_token = fixed_amount / move["token_in_price"] # amount_to_swap = min(fixed_amount_in_token, your_balance) # except ValueError: # msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number." # logging.warning(msg) # await telegram_utils.send_telegram_message(msg) # return # amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have # decimals = token_info.get('decimals') # # Convert to lamports # # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9 # amount = int(amount_to_swap * 10**decimals) # amount = int(amount) # logging.debug(f"Calculated amount in lamports: {amount}") # if your_balance < amount_to_swap: # should not happen # msg = ( # f"Warning:\n" # f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." # ) # logging.warning(msg) # await telegram_utils.send_telegram_message(msg) # try: # try: # notification = ( # f"Initiating move:\n" # f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}" # + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "") # ) # # logging.info(notification) # # error_logger.info(notification) # await telegram_utils.send_telegram_message(notification) # except Exception as e: # logging.error(f"Error sending notification: {e}") # for retry in range(3): # try: # private_key = Keypair.from_bytes(base58.b58decode(pk)) # async_client = AsyncClient(SOLANA_WS_URL) # jupiter = Jupiter(async_client, private_key) # transaction_data = await jupiter.swap( # input_mint=move['token_in'], # output_mint=move['token_out'], # amount=amount, # slippage_bps=300, # Increased to 3% # ) # logging.info(f"Initiating move. Transaction data:\n {transaction_data}") # error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") # raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) # message = raw_transaction.message # signature = private_key.sign_message(message.to_bytes_versioned()) # signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) # opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) # # send the transaction # result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) # transaction_id = json.loads(result.to_json())['result'] # print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}") # # append to notification # notification += f"\n\nTransaction: {transaction_id}" # await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}") # tx_details = await SAPI.get_transaction_details_with_retry(transaction_id) # if tx_details is not None: # break # else: # logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...") # await asyncio.sleep(3) # except Exception as e: # error_message = f"Move Failed:\n{str(e)}\n{transaction_data}\n{move}" # logging.error(error_message) # # log the errors to /logs/errors.log # error_logger.error(error_message) # error_logger.exception(e) # await telegram_utils.send_telegram_message(error_message) # amount = amount * 0.75 # await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) # try: # if tx_details is None: # logging.info(f"Failed to get transaction details for {transaction_id}") # notification = ( # f"Move Followed, failed to get transaction details.\n" # f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) " # f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" # f"\n\nTransaction: {transaction_id}" # # log_successful_swap () # ) # else: # notification = ( # f"Move Followed:\n" # f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) " # f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" # f"for {tx_details['amount_out']:.2f} {token_name_out}" # # f"Amount In USD: {tr_details['amount_in_USD']}\n" # f"\n\nTransaction: {transaction_id}" # ) # logging.info(notification) # await telegram_utils.send_telegram_message(notification) # except Exception as e: # logging.error(f"Error sending notification: {e}") # except Exception as e: # error_message = f"Swap Follow Error:\n{str(e)}" # logging.error(error_message) # # log the errors to /logs/errors.log # error_logger.error(error_message) # error_logger.exception(e) \ # # if error_message contains 'Program log: Error: insufficient funds' # if 'insufficient funds' in error_message: # await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") # else: # await telegram_utils.send_telegram_message(error_message) # Helper functions async def process_messages(websocket): try: while True: response = await websocket.recv() response_data = json.loads(response) logger.debug(f"Received response: {response_data}") if 'result' in response_data: new_sub_id = response_data['result'] if int(new_sub_id) > 1: subscription_id = new_sub_id logger.info(f"Subscription successful. New id: {subscription_id}") elif new_sub_id: logger.info(f"Existing subscription confirmed: {subscription_id}") else: return None return subscription_id elif 'params' in response_data: log = response_data['params']['result'] logger.debug(f"Received transaction log: {log}") asyncio.create_task(process_log(log)) else: logger.warning(f"Unexpected response: {response_data}") except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") # await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") pass except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") pk = None app = init_app() # app = init_app(follow_move_legacy) # Convert Flask app to ASGI asgi_app = WsgiToAsgi(app) async def restartable_task(task_func, *args, **kwargs): while True: try: await task_func(*args, **kwargs) except Exception as e: logging.error(f"Error in task {task_func.__name__}: {e}") await telegram_utils.send_telegram_message(f"Error in task {task_func.__name__}: {e}") await asyncio.sleep(5) # Wait before retrying async def main(): global bot, PROCESSING_LOG, pk pk = await get_pk() await telegram_utils.initialize() await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") # Start the log processor asyncio.create_task(restartable_task(watch_for_new_logs)) # process_transaction if DO_WATCH_WALLET: asyncio.create_task(restartable_task(SAPI.wallet_watch_loop)) await SAPI.wallet_watch_loop() def run_asyncio_tasks(): asyncio.run(main()) if __name__ == '__main__': import multiprocessing # Start the asyncio tasks in a separate process process = multiprocessing.Process(target=run_asyncio_tasks) process.start() # Run the ASGI server uvicorn.run( "app:asgi_app", host="0.0.0.0", port=3001, log_level="info", # "debug" reload=True ) # Wait for the asyncio tasks to complete process.join()