From fee5b516a102cf070b13466668938c6aa8735dec Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 15 Nov 2024 00:22:20 +0200 Subject: [PATCH] wip (broken) --- crypto/sol/app.py | 189 +++++------------------- crypto/sol/modules/SolanaAPI.py | 16 +-- crypto/sol/modules/log_processor.py | 214 ++++++++++++++++++++++------ crypto/sol/modules/webui.py | 4 +- 4 files changed, 212 insertions(+), 211 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 3d6ecb6..2084876 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1,180 +1,59 @@ import asyncio -import datetime -import json import logging -import os -from typing import Dict, Any - import uvicorn +from fastapi import FastAPI +from fastapi.middleware.wsgi import WSGIMiddleware + from asgiref.wsgi import WsgiToAsgi from dotenv import load_dotenv - +from modules.webui import init_app +from modules.utils import telegram_utils from config import DO_WATCH_WALLET from modules.SolanaAPI import SAPI -from modules.log_processor import watch_for_new_logs -from modules.utils import telegram_utils -from modules.webui import init_app, teardown_app +from modules.log_processor import LogProcessor -# Load environment variables load_dotenv() load_dotenv('.env.secret') - -# Configure logging logger = logging.getLogger(__name__) -class LogProcessor: - @staticmethod - def save_log(log: Dict[str, Any]) -> None: - """Save log to JSON file with timestamp.""" - try: - os.makedirs('./logs', exist_ok=True) - timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") - filename = f"./logs/log_{timestamp}.json" - with open(filename, 'w') as f: - json.dump(log, f, indent=2) - except Exception as e: - logger.error(f"Error saving RPC log: {e}") - - @staticmethod - def extract_transaction_details(logs: list) -> Dict[str, Any]: - """Extract transaction details from logs.""" - tr_details = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } - - before_source_balance = 0 - source_token_change = 0 - - for i, log_entry in enumerate(logs): - if tr_details["order_id"] is None and "order_id" in log_entry: - tr_details["order_id"] = log_entry.split(":")[-1].strip() - tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() - tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() - - if "source_token_change" in log_entry: - parts = log_entry.split(", ") - for part in parts: - if "source_token_change" in part: - tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 - elif "destination_token_change" in part: - tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 - - if "before_source_balance" in log_entry: - before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6 - if "source_token_change" in log_entry: - source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6 - - if before_source_balance > 0 and source_token_change > 0: - tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 - if tr_details["percentage_swapped"] > 100: - tr_details["percentage_swapped"] /= 1000 - - return tr_details - - @staticmethod - async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]: - """Process a single log entry.""" - if log_result['value']['err']: - return - - logs = log_result['value']['logs'] - swap_operations = [ - 'Program log: Instruction: Swap', - 'Program log: Instruction: Swap2', - 'Program log: Instruction: SwapExactAmountIn', - 'Program log: Instruction: SwapV2' - ] - - try: - if not any(op in logs for op in swap_operations): - return - - LogProcessor.save_log(log_result) - tx_signature = log_result['value']['signature'] - tr_details = LogProcessor.extract_transaction_details(logs) - - if not all([tr_details["token_in"], tr_details["token_out"], - tr_details["amount_in"], tr_details["amount_out"]]): - tr_details = await SAPI.get_transaction_details_info(tx_signature, logs) - - # Update token information - token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]] - token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]] - - tr_details.update({ - "symbol_in": token_in.get('symbol'), - "symbol_out": token_out.get('symbol'), - "amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0), - "amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0) - }) - - # Send notification - message = ( - f"Swap detected: \n" - f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} " - f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}" - ) - await telegram_utils.send_telegram_message(message) - - # Follow up actions - await SAPI.follow_move(tr_details) - await SAPI.save_token_info() - - except Exception as e: - logger.error(f"Error processing log: {e}") - await telegram_utils.send_telegram_message("Not followed! Error following move.") - - return tr_details - class Bot: @staticmethod async def initialize(): - """Initialize bot and start monitoring.""" await telegram_utils.initialize() - await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") - - asyncio.create_task(watch_for_new_logs()) + await telegram_utils.send_telegram_message("Solana Agent Started") + asyncio.create_task(LogProcessor.watch_for_new_logs()) if DO_WATCH_WALLET: asyncio.create_task(SAPI.wallet_watch_loop()) -async def start_server(): - """Run the ASGI server.""" - config = uvicorn.Config( +app = init_app() +asgi_app = WsgiToAsgi(app) + +async def main(): + global bot, PROCESSING_LOG, pk + + await telegram_utils.initialize() + await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") + # process_transaction + await SAPI.wallet_watch_loop() + +def run_asyncio_tasks(): + asyncio.run(main()) + +if __name__ == '__main__': + import multiprocessing + + # Start the asyncio tasks in a separate process + process = multiprocessing.Process(target=run_asyncio_tasks) + process.start() + + # Run the ASGI server + uvicorn.run( "app:asgi_app", host="0.0.0.0", port=3001, - log_level="info", + log_level="debug", reload=True ) - server = uvicorn.Server(config) - await server.serve() -async def main(): - """Main application entry point.""" - # Initialize app and create ASGI wrapper - app = await init_app() - global asgi_app - asgi_app = WsgiToAsgi(app) - - # Initialize bot - await Bot.initialize() - - # Start server - try: - await start_server() - except KeyboardInterrupt: - logger.info("Shutting down...") - await teardown_app() - -if __name__ == '__main__': - try: - asyncio.run(main()) - except KeyboardInterrupt: - logger.info("Application terminated by user") \ No newline at end of file + # Wait for the asyncio tasks to complete + process.join() diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index cbce708..0be36c7 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -84,7 +84,6 @@ class SolanaWS: self.subscription_id = None self.message_queue = asyncio.Queue() self.on_message = on_message - self.websocket = None self.last_msg_responded = False async def save_log(log): @@ -100,8 +99,6 @@ class SolanaWS: async def connect(self): while True: - if self.websocket is None or self.websocket.closed: - await self.connect() try: current_url = random.choice(SOLANA_ENDPOINTS) self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=10) @@ -178,8 +175,10 @@ class SolanaWS: response_data = json.loads(response) self.last_msg_responded = True - if 'result' in response_data: + if 'params' in response_data and 'result' in response_data["params"]: await self.message_queue.put(response_data['result']) + else: + logger.warning(f"Unexpected response: {response_data}") if one: break except websockets.exceptions.ConnectionClosedError: @@ -285,15 +284,16 @@ class SolanaAPI: await solana_ws.connect() await solana_ws.subscribe() + receive_task = asyncio.create_task(solana_ws.receive_messages()) + process_task = asyncio.create_task(solana_ws.process_messages()) + + if first_subscription: first_subscription = False await async_safe_call(self.on_initial_subscription, solana_ws.subscription_id) await async_safe_call(self.on_bot_message,f"Solana mainnet connected ({solana_ws.subscription_id})...") - receive_task = asyncio.create_task(solana_ws.receive_messages()) - process_task = asyncio.create_task(solana_ws.process_messages()) - try: await asyncio.gather(receive_task, process_task) except asyncio.CancelledError: @@ -1211,7 +1211,7 @@ class SolanaDEX: if mint in self.TOKENS_INFO: token_name = self.TOKENS_INFO[mint].get('symbol') elif doGetTokenName: - token_name = await self.get_token_metadata_symbol(mint) or 'N/A' + token_name = await SAPI.get_token_metadata_symbol(mint) or 'N/A' self.TOKENS_INFO[mint] = {'symbol': token_name} await asyncio.sleep(2) diff --git a/crypto/sol/modules/log_processor.py b/crypto/sol/modules/log_processor.py index b23a90a..2f9362d 100644 --- a/crypto/sol/modules/log_processor.py +++ b/crypto/sol/modules/log_processor.py @@ -1,60 +1,182 @@ +from asyncio.log import logger +import datetime +import json import os import asyncio from pathlib import Path +from typing import Any, Dict + +from .utils import TelegramUtils from .storage import Storage -from .SolanaAPI import SolanaAPI +from .SolanaAPI import SAPI, SolanaAPI LOG_DIRECTORY = "./logs" FILE_MASK = "wh_*.json" -async def process_log_file(file_path): - # Read the file and extract transaction data - with open(file_path, 'r') as file: - data = file.read() - # Assume data is parsed into these variables - wallet_id = "extracted_wallet_id" - transaction_type = "extracted_transaction_type" - sell_currency = "extracted_sell_currency" - sell_amount = 0.0 - sell_value = 0.0 - buy_currency = "extracted_buy_currency" - buy_amount = 0.0 - buy_value = 0.0 - solana_signature = "extracted_solana_signature" - details = {} - # Process the webhook data - solana_api = SolanaAPI() - transaction_data = await solana_api.process_wh(data) - # Check if the transaction already exists - existing_transaction = await Storage.get_prisma().transaction.find_first( - where={'solana_signature': solana_signature} - ) +class LogProcessor: + @staticmethod + def save_log(log: Dict[str, Any]) -> None: + """Save log to JSON file with timestamp.""" + try: + os.makedirs('./logs', exist_ok=True) + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") + filename = f"./logs/log_{timestamp}.json" + with open(filename, 'w') as f: + json.dump(log, f, indent=2) + except Exception as e: + logger.error(f"Error saving RPC log: {e}") - if not existing_transaction: - # Store the transaction if it doesn't exist - transaction_data = { - 'wallet_id': wallet_id, - 'type': transaction_type, - 'sell_currency': sell_currency, - 'sell_amount': sell_amount, - 'sell_value': sell_value, - 'buy_currency': buy_currency, - 'buy_amount': buy_amount, - 'buy_value': buy_value, - 'solana_signature': solana_signature, - 'details': details + @staticmethod + def extract_transaction_details(logs: list) -> Dict[str, Any]: + """Extract transaction details from logs.""" + tr_details = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 } - storage = Storage() - await storage.store_transaction(transaction_data) - # Rename the file to append '_saved' - new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix) - os.rename(file_path, new_file_path) + before_source_balance = 0 + source_token_change = 0 -async def watch_for_new_logs(): - while True: - for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK): - await process_log_file(file_path) - await asyncio.sleep(10) # Check every 10 seconds + for i, log_entry in enumerate(logs): + if tr_details["order_id"] is None and "order_id" in log_entry: + tr_details["order_id"] = log_entry.split(":")[-1].strip() + tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() + tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() + + if "source_token_change" in log_entry: + parts = log_entry.split(", ") + for part in parts: + if "source_token_change" in part: + tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 + elif "destination_token_change" in part: + tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 + + if "before_source_balance" in log_entry: + before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6 + if "source_token_change" in log_entry: + source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6 + + if before_source_balance > 0 and source_token_change > 0: + tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 + if tr_details["percentage_swapped"] > 100: + tr_details["percentage_swapped"] /= 1000 + + return tr_details + + @staticmethod + async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]: + """Process a single log entry.""" + if log_result['value']['err']: + return + + logs = log_result['value']['logs'] + swap_operations = [ + 'Program log: Instruction: Swap', + 'Program log: Instruction: Swap2', + 'Program log: Instruction: SwapExactAmountIn', + 'Program log: Instruction: SwapV2' + ] + + try: + if not any(op in logs for op in swap_operations): + return + + LogProcessor.save_log(log_result) + tx_signature = log_result['value']['signature'] + tr_details = LogProcessor.extract_transaction_details(logs) + + if not all([tr_details["token_in"], tr_details["token_out"], + tr_details["amount_in"], tr_details["amount_out"]]): + tr_details = await SAPI.get_transaction_details_info(tx_signature, logs) + + # Update token information + token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]] + token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]] + + tr_details.update({ + "symbol_in": token_in.get('symbol'), + "symbol_out": token_out.get('symbol'), + "amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0), + "amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0) + }) + + # Send notification + message = ( + f"Swap detected: \n" + f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} " + f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}" + ) + await TelegramUtils.send_telegram_message(message) + + # Follow up actions + await SAPI.follow_move(tr_details) + await SAPI.save_token_info() + + except Exception as e: + logger.error(f"Error processing log: {e}") + await TelegramUtils.send_telegram_message("Not followed! Error following move.") + + return tr_details + + + @staticmethod + async def process_log_fileDump(self, file_path): + # Read the file and extract transaction data + with open(file_path, 'r') as file: + data = file.read() + # Assume data is parsed into these variables + wallet_id = "extracted_wallet_id" + transaction_type = "extracted_transaction_type" + sell_currency = "extracted_sell_currency" + sell_amount = 0.0 + sell_value = 0.0 + buy_currency = "extracted_buy_currency" + buy_amount = 0.0 + buy_value = 0.0 + solana_signature = "extracted_solana_signature" + details = {} + + # Process the webhook data + solana_api = SolanaAPI() + transaction_data = await solana_api.process_wh(data) + + # Check if the transaction already exists + existing_transaction = await Storage.get_prisma().transaction.find_first( + where={'solana_signature': solana_signature} + ) + + if not existing_transaction: + # Store the transaction if it doesn't exist + transaction_data = { + 'wallet_id': wallet_id, + 'type': transaction_type, + 'sell_currency': sell_currency, + 'sell_amount': sell_amount, + 'sell_value': sell_value, + 'buy_currency': buy_currency, + 'buy_amount': buy_amount, + 'buy_value': buy_value, + 'solana_signature': solana_signature, + 'details': details + } + storage = Storage() + await storage.store_transaction(transaction_data) + + # Rename the file to append '_saved' + new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix) + os.rename(file_path, new_file_path) + + @staticmethod + async def watch_for_new_logs(): + while True: + for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK): + await LogProcessor.process_log_fileDump(file_path) + await asyncio.sleep(10) # Check every 10 seconds diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index 53a8c60..3449343 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -21,7 +21,7 @@ from datetime import datetime on_transaction = None -async def init_app(tr_handler=None): +def init_app(tr_handler=None): global on_transaction on_transaction = tr_handler app = Flask(__name__, template_folder='../templates', static_folder='../static') @@ -38,7 +38,7 @@ async def init_app(tr_handler=None): if not storage.is_connected(): await storage.connect() - asyncio.run(ensure_storage_connection()) + ensure_storage_connection() # oauth = OAuth(app) # google = oauth.remote_app(