import concurrent.futures import threading import queue import uvicorn import os from dotenv import load_dotenv import datetime import json import websockets import logging from modules.webui import init_app from modules.utils import telegram_utils, logging, get_pk from modules.log_processor import watch_for_new_logs from modules.SolanaAPI import SAPI from config import DO_WATCH_WALLET from asgiref.wsgi import WsgiToAsgi from multiprocessing import Process import time load_dotenv() load_dotenv('.env.secret') def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logging.error(f"Error saving RPC log: {e}") PROCESSING_LOG = False log_queue = queue.Queue() def process_log(log_result): global PROCESSING_LOG tr_details = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } if log_result['value']['err']: return logs = log_result['value']['logs'] try: PROCESSING_LOG = True swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2'] if any(op in logs for op in swap_operations): save_log(log_result) tx_signature_str = log_result['value']['signature'] before_source_balance = 0 source_token_change = 0 i = 0 while i < len(logs): log_entry = logs[i] if tr_details["order_id"] is None and "order_id" in log_entry: tr_details["order_id"] = log_entry.split(":")[-1].strip() tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 elif "destination_token_change" in part: tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 if "before_source_balance" in log_entry: parts = log_entry.split(", ") for part in parts: if "before_source_balance" in part: before_source_balance = float(part.split(":")[-1].strip()) / 10 ** 6 if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6 i += 1 try: if tr_details["token_in"] is None or tr_details["token_out"] is None or \ tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") tr_details = SAPI.get_transaction_details_info(tx_signature_str, logs) if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 if tr_details["percentage_swapped"] > 100: tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000 try: token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]] token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]] tr_details["symbol_in"] = token_in.get('symbol') tr_details["symbol_out"] = token_out.get('symbol') tr_details['amount_in_USD'] = tr_details['amount_in'] * token_in.get('price', 0) tr_details['amount_out_USD'] = tr_details['amount_out'] * token_out.get('price', 0) except Exception as e: logging.error(f"Error fetching token prices: {e}") message_text = ( f"Swap detected: \n" f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} " f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']} \n" ) telegram_utils.send_telegram_message(message_text) SAPI.follow_move(tr_details) SAPI.save_token_info() except Exception as e: logging.error(f"Error acquiring log details and following: {e}") telegram_utils.send_telegram_message(f"Not followed! Error following move.") except Exception as e: logging.error(f"Error processing log: {e}") PROCESSING_LOG = False return tr_details # def process_messages(websocket): # try: # while True: # response = websocket.recv() # response_data = json.loads(response) # logger.debug(f"Received response: {response_data}") # if 'result' in response_data: # new_sub_id = response_data['result'] # if int(new_sub_id) > 1: # subscription_id = new_sub_id # logger.info(f"Subscription successful. New id: {subscription_id}") # elif new_sub_id: # logger.info(f"Existing subscription confirmed: {subscription_id}") # else: # return None # return subscription_id # elif 'params' in response_data: # log = response_data['params']['result'] # logger.debug(f"Received transaction log: {log}") # log_queue.put(log) # else: # logger.warning(f"Unexpected response: {response_data}") # except Exception as e: # logger.error(f"An error occurred: {e}") def log_processor_worker(): with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: while True: try: log = log_queue.get() executor.submit(process_log, log) except Exception as e: logger.error(f"Error in log processor worker: {e}") finally: log_queue.task_done() logger = logging.getLogger(__name__) app = init_app() asgi_app = WsgiToAsgi(app) def run_with_retry(task_func, *args, **kwargs): while True: try: task_func(*args, **kwargs) except Exception as e: error_msg = f"Error in task {task_func.__name__}: {e}" logger.error(error_msg) telegram_utils.send_telegram_message(error_msg) time.sleep(5) def init_bot(): # Initialize bot components # pk = get_pk() telegram_utils.initialize() telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") # Start monitoring tasks threading.Thread(target=run_with_retry, args=(watch_for_new_logs,), daemon=True).start() if DO_WATCH_WALLET: threading.Thread(target=run_with_retry, args=(SAPI.wallet_watch_loop,), daemon=True).start() def run_server(): uvicorn.run( "app:asgi_app", host="0.0.0.0", port=3001, log_level="info", reload=True ) def main(): # Start log processor worker log_processor_thread = threading.Thread(target=log_processor_worker, daemon=True) log_processor_thread.start() # Initialize bot init_bot() # Start server in a separate process server_process = Process(target=run_server) server_process.start() try: # Keep main process running while True: time.sleep(1) except KeyboardInterrupt: logger.info("Shutting down...") finally: server_process.terminate() server_process.join() if __name__ == '__main__': main()