From 98364fc1dabc6c7a0c6208d48f89788169e48ef7 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 21 Oct 2024 17:56:38 +0300 Subject: [PATCH] refactoring wip broken --- crypto/sol/app.py | 132 ++------------------------------ crypto/sol/modules/SolanaAPI.py | 36 +-------- crypto/sol/modules/utils.py | 71 ++++++++++++++++- crypto/sol/modules/webui.py | 70 ++++++++++++++++- 4 files changed, 150 insertions(+), 159 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 6cb9b79..636d574 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -28,8 +28,6 @@ from dexscreener import DexscreenerClient from solana.rpc.types import TokenAccountOpts, TxOpts import datetime -import logging -from logging.handlers import RotatingFileHandler import base64 import os from dotenv import load_dotenv,set_key @@ -39,10 +37,15 @@ import requests import re from typing import List, Dict, Any, Tuple import random -import time +from threading import Thread app = Flask(__name__) +from modules.webui import init_app +from modules.storage import init_db, store_transaction +from modules.utils import telegram_utils, logging + + # config = load_config() load_dotenv() load_dotenv('.env.secret') @@ -56,124 +59,6 @@ SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') FOLLOW_AMOUNT = os.getenv('FOLLOW_AMOUNT', 'percentage') -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) -#logging.basicConfig(level=logging.INFO) - -# Set up error logger -log_dir = './logs' -log_file = os.path.join(log_dir, 'error.log') -os.makedirs(log_dir, exist_ok=True) -error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) -error_file_handler.setLevel(logging.ERROR) -error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) -error_file_handler.formatter.converter = time.localtime -error_logger = logging.getLogger('error_logger') -error_logger.setLevel(logging.ERROR) -error_logger.addHandler(error_file_handler) - - -# Set up success logger for accounting CSV -class CSVFormatter(logging.Formatter): - def __init__(self): - super().__init__() - self.output = None - - def format(self, record): - if self.output is None: - self.output = csv.writer(record.stream) - self.output.writerow(['Timestamp', 'Token In', 'Token Out', 'Amount In', 'Amount Out', 'USD Value In', 'USD Value Out', 'Transaction Hash', 'Wallet Address']) - self.output.writerow([ - self.formatTime(record, self.datefmt), - record.token_in, - record.token_out, - record.amount_in, - record.amount_out, - record.usd_value_in, - record.usd_value_out, - record.tx_hash, - record.wallet_address - ]) - return '' - -def log_successful_swap(token_in, token_out, amount_in, amount_out, usd_value_in, usd_value_out, tx_hash, wallet_address): - success_logger_accounting_csv.info('', extra={ - 'token_in': token_in, - 'token_out': token_out, - 'amount_in': amount_in, - 'amount_out': amount_out, - 'usd_value_in': usd_value_in, - 'usd_value_out': usd_value_out, - 'tx_hash': tx_hash, - 'wallet_address': wallet_address - }) - -success_log_file = os.path.join(log_dir, 'successful_swaps.csv') -success_file_handler = RotatingFileHandler(success_log_file, maxBytes=10*1024*1024, backupCount=5) -success_file_handler.setFormatter(CSVFormatter()) -success_logger_accounting_csv = logging.getLogger('success_logger_accounting_csv') -success_logger_accounting_csv.setLevel(logging.INFO) -success_logger_accounting_csv.addHandler(success_file_handler) - - -# Function to find the latest log file -def get_latest_log_file(): - log_dir = './logs' - try: - # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] - # filter files mask log_20241005_004103_143116.json - files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] - - latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))) - return os.path.join(log_dir, latest_file) - except Exception as e: - logging.error(f"Error fetching latest log file: {e}") - return None - -# Flask route to retry processing the last log -@app.route('/retry', methods=['GET']) -@app.route('/retry-last-log', methods=['GET']) -async def retry_last_log(): - latest_log_file = get_latest_log_file() - if not latest_log_file: - return jsonify({"error": "No log files found"}), 404 - - try: - logger.info(f"Processing latest log file: {latest_log_file}") - with open(latest_log_file, 'r') as f: - log = json.load(f) - - result = await process_log(log) - - return jsonify({ - "file": latest_log_file, - "status": "Log dump processed successfully", - "result": result - }), 200 - - except Exception as e: - logging.error(f"Error processing log dump: {e}") - return jsonify({"error": "Failed to process log"}), 500 - - -#const webhookPath = `/tr/${followedWallet.toBase58()}/${logs.signature}`; -@app.route('/tr//', methods=['GET', 'POST']) -async def transaction_notified(wallet, tx_signature): - try: - logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") - # Process the transaction - # tr = await get_swap_transaction_details(tx_signature) - tr = await get_transaction_details_info(tx_signature, []) - # ToDo - probably optimize - await get_token_metadata_symbol(tr['token_in']) - await get_token_metadata_symbol(tr['token_out']) - await follow_move(tr) - await save_token_info() - return jsonify(tr), 200 - except Exception as e: - logging.error(f"Error processing transaction: {e}") - return jsonify({"error": "Failed to process transaction"}), 500 - # Configuration @@ -536,7 +421,7 @@ async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, ma # query every 5 seconds for the transaction details until not None or 30 seconds for _ in range(max_retries): try: - tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id) + tx_details = await get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: @@ -1054,7 +939,6 @@ async def check_PK(): # Convert Flask app to ASGI asgi_app = WsgiToAsgi(app) -solanaAPI = SolanaAPI() async def main(): global solanaAPI, bot, PROCESSING_LOG @@ -1063,7 +947,7 @@ async def main(): await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() # new: restart wallet_watch_loop every hour - await solanaAPI.wallet_watch_loop() + await wallet_watch_loop() # while True: # wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop()) diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 7f5cda2..84c31f5 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -198,10 +198,6 @@ class SolanaAPI: await self.on_bot_message("Reconnecting...") await asyncio.sleep(5) - async def process_transaction(self, signature): - print(f"Processing transaction: {signature['signature']}") - # Add your transaction processing logic here - async def get_last_transactions(self, account_address, check_interval=300, limit=1000): last_check_time = None last_signature = None @@ -220,7 +216,7 @@ class SolanaAPI: if last_signature: params[1]["before"] = last_signature - result = await solana_jsonrpc("getSignaturesForAddress", params) + result = await self.solana_ws.solana_jsonrpc("getSignaturesForAddress", params) if result: for signature in result: @@ -243,7 +239,7 @@ class SolanaAPI: return TOKENS_INFO[mint_address].get('symbol') try: - account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address) + account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_data = account_data_result['value']['data'] if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: @@ -281,7 +277,7 @@ class SolanaAPI: transaction_details = json.load(f) return transaction_details else: - transaction_details = await solana_jsonrpc("getTransaction", tx_signature) + transaction_details = await self.solana_ws.solana_jsonrpc("getTransaction", tx_signature) with open('./logs/transation_details.json', 'w') as f: json.dump(transaction_details, f, indent=2) @@ -357,7 +353,7 @@ class SolanaAPI: for transfer in [transfers[0], transfers[-1]]: if transfer['mint'] is None: # Query the Solana network for the account data - account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) + account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", transfer['source']) if 'value' in account_data_result and 'data' in account_data_result['value']: account_data_value = account_data_result['value'] @@ -721,27 +717,3 @@ class SolanaDEX: # save token info to file await save_token_info() - - -#example -# async def main(): -# await telegram_utils.initialize() - -# async def process_log(log): -# print(f"Processing log: {log}") - -# async def list_initial_wallet_states(): -# print("Listing initial wallet states") - - -# wallet_watch_task = asyncio.create_task(solana_api.wallet_watch_loop()) - -# try: -# await asyncio.gather(wallet_watch_task) -# except asyncio.CancelledError: -# pass -# finally: -# await telegram_utils.close() - -# if __name__ == "__main__": -# asyncio.run(main()) \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index 6ed7ab7..8c8edaf 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -9,6 +9,11 @@ from telegram import Bot from telegram.constants import ParseMode from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME + +import time +import logging +from logging.handlers import RotatingFileHandler + class TelegramUtils: def __init__(self): self.bot = None @@ -37,7 +42,71 @@ class TelegramUtils: if self.conn_pool: await self.conn_pool.close() + +class Log: + # Set up success logger for accounting CSV + class CSVFormatter(logging.Formatter): + def __init__(self): + super().__init__() + self.output = None + + def format(self, record): + if self.output is None: + self.output = csv.writer(record.stream) + self.output.writerow(['Timestamp', 'Token In', 'Token Out', 'Amount In', 'Amount Out', 'USD Value In', 'USD Value Out', 'Transaction Hash', 'Wallet Address']) + self.output.writerow([ + self.formatTime(record, self.datefmt), + record.token_in, + record.token_out, + record.amount_in, + record.amount_out, + record.usd_value_in, + record.usd_value_out, + record.tx_hash, + record.wallet_address + ]) + return '' + + def __init__(self): + logger = logging.getLogger(__name__) + logging.basicConfig(level=logging.DEBUG) + #logging.basicConfig(level=logging.INFO) + + # Set up error logger + log_dir = './logs' + log_file = os.path.join(log_dir, 'error.log') + os.makedirs(log_dir, exist_ok=True) + error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) + error_file_handler.setLevel(logging.ERROR) + error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) + error_file_handler.formatter.converter = time.localtime + error_logger = logging.getLogger('error_logger') + error_logger.setLevel(logging.ERROR) + error_logger.addHandler(error_file_handler) + success_log_file = os.path.join(log_dir, 'successful_swaps.csv') + success_file_handler = RotatingFileHandler(success_log_file, maxBytes=10*1024*1024, backupCount=5) + success_file_handler.setFormatter(self.CSVFormatter()) + success_logger_accounting_csv = logging.getLogger('success_logger_accounting_csv') + success_logger_accounting_csv.setLevel(logging.INFO) + success_logger_accounting_csv.addHandler(success_file_handler) + + + def log_successful_swap(token_in, token_out, amount_in, amount_out, usd_value_in, usd_value_out, tx_hash, wallet_address): + success_logger_accounting_csv.info('', extra={ + 'token_in': token_in, + 'token_out': token_out, + 'amount_in': amount_in, + 'amount_out': amount_out, + 'usd_value_in': usd_value_in, + 'usd_value_out': usd_value_out, + 'tx_hash': tx_hash, + 'wallet_address': wallet_address + }) + + + + # Create a global instance of TelegramUtils telegram_utils = TelegramUtils() - +log = Log() # You can add more Telegram-related methods to the TelegramUtils class if needed \ No newline at end of file diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index 4dee698..b83b944 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -106,7 +106,73 @@ def init_app(): def get_holdings(wallet_id): holdings = storage.get_holdings(wallet_id) return jsonify(holdings) + + + # Flask route to retry processing the last log + @app.route('/retry', methods=['GET']) + @app.route('/retry-last-log', methods=['GET']) + async def retry_last_log(): + latest_log_file = get_latest_log_file() + if not latest_log_file: + return jsonify({"error": "No log files found"}), 404 -# Implement other routes for reports, price alerts, following accounts, etc. + try: + logger.info(f"Processing latest log file: {latest_log_file}") + with open(latest_log_file, 'r') as f: + log = json.load(f) - return app \ No newline at end of file + result = await process_log(log) + + return jsonify({ + "file": latest_log_file, + "status": "Log dump processed successfully", + "result": result + }), 200 + + except Exception as e: + logging.error(f"Error processing log dump: {e}") + return jsonify({"error": "Failed to process log"}), 500 + + + + + + #const webhookPath = `/tr/${followedWallet.toBase58()}/${logs.signature}`; + @app.route('/tr//', methods=['GET', 'POST']) + async def transaction_notified(wallet, tx_signature): + try: + logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") + # Process the transaction + # tr = await get_swap_transaction_details(tx_signature) + tr = await get_transaction_details_info(tx_signature, []) + # ToDo - probably optimize + await get_token_metadata_symbol(tr['token_in']) + await get_token_metadata_symbol(tr['token_out']) + await follow_move(tr) + await save_token_info() + return jsonify(tr), 200 + except Exception as e: + logging.error(f"Error processing transaction: {e}") + return jsonify({"error": "Failed to process transaction"}), 500 + + + + + + + + return app + +# Function to find the latest log file +def get_latest_log_file(): + log_dir = './logs' + try: + # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] + # filter files mask log_20241005_004103_143116.json + files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] + + latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))) + return os.path.join(log_dir, latest_file) + except Exception as e: + logging.error(f"Error fetching latest log file: {e}") + return None