diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 5875985..8bd5efe 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1,88 +1,36 @@ import asyncio import uvicorn -from asgiref.wsgi import WsgiToAsgi -import websockets -import json - -import datetime import os from dotenv import load_dotenv - - -from threading import Thread -from solana.rpc.async_api import AsyncClient -from solders.transaction import VersionedTransaction -from solana.rpc.types import TxOpts -from solders.keypair import Keypair -from jupiter_python_sdk.jupiter import Jupiter -from solana.rpc.commitment import Processed - +import datetime +import json +import websockets +import logging from modules.webui import init_app -from modules.storage import init_db, store_transaction from modules.utils import telegram_utils, logging, get_pk from modules.log_processor import watch_for_new_logs from modules.SolanaAPI import SAPI +from config import DO_WATCH_WALLET + +from asgiref.wsgi import WsgiToAsgi +from multiprocessing import Process -# config = load_config() load_dotenv() load_dotenv('.env.secret') -# Configuration - -from config import (DO_WATCH_WALLET, logging, logger) - - - - -# # # # # # # # # # TELEGRAM # # # # # # # # # # -# if not telegram_utils.bot: -# try: -# asyncio.run(telegram_utils.initialize()) -# except Exception as e: -# logging.error(f"Error initializing Telegram bot: {str(e)}") -# async def telegram_utils.send_telegram_message(message): -# try: -# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) -# logging.info(f"Telegram message sent: {message}") -# # logging.info(f"Telegram message dummy sent: {message}") -# except Exception as e: -# logging.error(f"Error sending Telegram message: {str(e)}") - - - -# # # # # # # # # # DATABASE # # # # # # # # # # - - - -# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # - - -# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # - - - - - -# # # # # # # # # # Functionality # # # # # # # # # # - - - - - async def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" - with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logging.error(f"Error saving RPC log: {e}") - PROCESSING_LOG = False + async def process_log(log_result): global PROCESSING_LOG tr_details = { @@ -101,13 +49,10 @@ async def process_log(log_result): logs = log_result['value']['logs'] try: - # Detect swap operations in logs PROCESSING_LOG = True swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2'] - if any(op in logs for op in swap_operations): - # Save the log to a file await save_log(log_result) tx_signature_str = log_result['value']['signature'] @@ -118,29 +63,21 @@ async def process_log(log_result): while i < len(logs): log_entry = logs[i] - # Check if we found the 'order_id' if tr_details["order_id"] is None and "order_id" in log_entry: - # Extract the order_id tr_details["order_id"] = log_entry.split(":")[-1].strip() tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() - # Look for the token change amounts after tokens have been found if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: - tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals + tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 elif "destination_token_change" in part: - tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals + tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 i += 1 - # calculate percentage swapped by digging before_source_balance, source_token_change and after_source_balance - - # "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752", - # "Program log: after_source_balance: 0, after_destination_balance: 770570049", - # "Program log: source_token_change: 19471871, destination_token_change: 770570049", if "before_source_balance" in log_entry: parts = log_entry.split(", ") for part in parts: @@ -151,25 +88,16 @@ async def process_log(log_result): for part in parts: if "source_token_change" in part: source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6 - - # GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS try: if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") tr_details = await SAPI.get_transaction_details_info(tx_signature_str, logs) - # onlt needed if no details got if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 - #dirty fix for percentage > 100 (decimals 9 but expecting 6) if tr_details["percentage_swapped"] > 100: tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000 - - # update token info: ToDo: check, but already did - # all_token_addresses = list(set([tr_details["token_in"], tr_details["token_out"]])) - # await get_token_prices(all_token_addresses) - try: token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]] token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]] @@ -184,7 +112,7 @@ async def process_log(log_result): message_text = ( f"Swap detected: \n" - f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " # ({tr_details['token_in']}) ({tr_details['token_out']}) + f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " f"{tr_details['symbol_out']} \n" ) await telegram_utils.send_telegram_message(message_text) @@ -195,16 +123,12 @@ async def process_log(log_result): logging.error(f"Error aquiring log details and following: {e}") await telegram_utils.send_telegram_message(f"Not followed! Error following move.") - - except Exception as e: logging.error(f"Error processing log: {e}") - PROCESSING_LOG = False return tr_details - async def process_messages(websocket): try: while True: @@ -230,7 +154,6 @@ async def process_messages(websocket): except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") - # await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") pass except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") @@ -238,10 +161,8 @@ async def process_messages(websocket): logger.error(f"An unexpected error occurred: {e}") -pk = None +logger = logging.getLogger(__name__) app = init_app() -# app = init_app(follow_move_legacy) -# Convert Flask app to ASGI asgi_app = WsgiToAsgi(app) async def restartable_task(task_func, *args, **kwargs): @@ -249,43 +170,50 @@ async def restartable_task(task_func, *args, **kwargs): try: await task_func(*args, **kwargs) except Exception as e: - logging.error(f"Error in task {task_func.__name__}: {e}") - await telegram_utils.send_telegram_message(f"Error in task {task_func.__name__}: {e}") - await asyncio.sleep(5) # Wait before retrying - -async def main(): - global bot, PROCESSING_LOG, pk + error_msg = f"Error in task {task_func.__name__}: {e}" + logger.error(error_msg) + await telegram_utils.send_telegram_message(error_msg) + await asyncio.sleep(5) +async def init_bot(): + # Initialize bot components pk = await get_pk() await telegram_utils.initialize() await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") - - # Start the log processor + + # Start monitoring tasks asyncio.create_task(restartable_task(watch_for_new_logs)) - # process_transaction if DO_WATCH_WALLET: asyncio.create_task(restartable_task(SAPI.wallet_watch_loop)) - await SAPI.wallet_watch_loop() - -def run_asyncio_tasks(): - asyncio.run(main()) -if __name__ == '__main__': - import multiprocessing - - # Start the asyncio tasks in a separate process - process = multiprocessing.Process(target=run_asyncio_tasks) - process.start() - - # Run the ASGI server +def run_server(): uvicorn.run( "app:asgi_app", host="0.0.0.0", port=3001, - log_level="info", # "debug" + log_level="info", reload=True ) - # Wait for the asyncio tasks to complete - process.join() +async def main(): + # Start bot initialization in background + bot_task = asyncio.create_task(init_bot()) + + # Start server in a separate process + server_process = Process(target=run_server) + server_process.start() + + try: + # Wait for bot initialization + await bot_task + # Keep main process running + await asyncio.Event().wait() + except KeyboardInterrupt: + logger.info("Shutting down...") + finally: + server_process.terminate() + server_process.join() + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index d37429c..d99b13c 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -1,7 +1,7 @@ -import asyncio import os import sys -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import asyncio +from concurrent.futures import ThreadPoolExecutor from flask import Flask, jsonify, request, render_template, redirect, url_for # from flask_oauthlib.client import OAuth @@ -23,6 +23,7 @@ def init_app(tr_handler=None): global on_transaction on_transaction = tr_handler app = Flask(__name__, template_folder='../templates', static_folder='../static') + executor = ThreadPoolExecutor(max_workers=10) # Adjust the number of workers as needed login_manager = LoginManager(app) login_manager.login_view = 'login' @@ -130,7 +131,7 @@ def init_app(tr_handler=None): # await process_wh(request_data) # don't wait for the process to finish - asyncio.create_task(process_wh(request_data )) + executor.submit(asyncio.run, process_wh(request_data)) return jsonify({"status": "OK"}), 200 except Exception as e: logging.error(f"Error processing webhook: {e}") @@ -138,7 +139,7 @@ def init_app(tr_handler=None): # Flask route to retry processing the last log - async def process_wh( data): + async def process_wh(data): global on_transaction try: @@ -379,4 +380,4 @@ def get_latest_log_file(wh:bool): utils.log.error(f"Error fetching latest log file: {e}") return None -export = init_app \ No newline at end of file +export = init_app