from asyncio.log import logger import datetime import json import os import asyncio from pathlib import Path from typing import Any, Dict from .utils import TelegramUtils from .storage import Storage from .SolanaAPI import SAPI, SolanaAPI LOG_DIRECTORY = "./logs" FILE_MASK = "wh_*.json" class LogProcessor: @staticmethod def save_log(log: Dict[str, Any]) -> None: """Save log to JSON file with timestamp.""" try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: logger.error(f"Error saving RPC log: {e}") @staticmethod def extract_transaction_details(logs: list) -> Dict[str, Any]: """Extract transaction details from logs.""" tr_details = { "order_id": None, "token_in": None, "token_out": None, "amount_in": 0, "amount_out": 0, "amount_in_USD": 0, "amount_out_USD": 0, "percentage_swapped": 0 } before_source_balance = 0 source_token_change = 0 for i, log_entry in enumerate(logs): if tr_details["order_id"] is None and "order_id" in log_entry: tr_details["order_id"] = log_entry.split(":")[-1].strip() tr_details["token_in"] = logs[i + 1].split(":")[-1].strip() tr_details["token_out"] = logs[i + 2].split(":")[-1].strip() if "source_token_change" in log_entry: parts = log_entry.split(", ") for part in parts: if "source_token_change" in part: tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 elif "destination_token_change" in part: tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 if "before_source_balance" in log_entry: before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6 if "source_token_change" in log_entry: source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6 if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 if tr_details["percentage_swapped"] > 100: tr_details["percentage_swapped"] /= 1000 return tr_details @staticmethod async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]: """Process a single log entry.""" if log_result['value']['err']: return logs = log_result['value']['logs'] swap_operations = [ 'Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2' ] try: if not any(op in logs for op in swap_operations): return LogProcessor.save_log(log_result) tx_signature = log_result['value']['signature'] tr_details = LogProcessor.extract_transaction_details(logs) if not all([tr_details["token_in"], tr_details["token_out"], tr_details["amount_in"], tr_details["amount_out"]]): tr_details = await SAPI.get_transaction_details_info(tx_signature, logs) # Update token information token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]] token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]] tr_details.update({ "symbol_in": token_in.get('symbol'), "symbol_out": token_out.get('symbol'), "amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0), "amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0) }) # Send notification message = ( f"Swap detected: \n" f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} " f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}" ) await TelegramUtils.send_telegram_message(message) # Follow up actions await SAPI.follow_move(tr_details) await SAPI.save_token_info() except Exception as e: logger.error(f"Error processing log: {e}") await TelegramUtils.send_telegram_message("Not followed! Error following move.") return tr_details @staticmethod async def process_log_fileDump(self, file_path): # Read the file and extract transaction data with open(file_path, 'r') as file: data = file.read() # Assume data is parsed into these variables wallet_id = "extracted_wallet_id" transaction_type = "extracted_transaction_type" sell_currency = "extracted_sell_currency" sell_amount = 0.0 sell_value = 0.0 buy_currency = "extracted_buy_currency" buy_amount = 0.0 buy_value = 0.0 solana_signature = "extracted_solana_signature" details = {} # Process the webhook data solana_api = SolanaAPI() transaction_data = await solana_api.process_wh(data) # Check if the transaction already exists existing_transaction = await Storage.get_prisma().transaction.find_first( where={'solana_signature': solana_signature} ) if not existing_transaction: # Store the transaction if it doesn't exist transaction_data = { 'wallet_id': wallet_id, 'type': transaction_type, 'sell_currency': sell_currency, 'sell_amount': sell_amount, 'sell_value': sell_value, 'buy_currency': buy_currency, 'buy_amount': buy_amount, 'buy_value': buy_value, 'solana_signature': solana_signature, 'details': details } storage = Storage() await storage.store_transaction(transaction_data) # Rename the file to append '_saved' new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix) os.rename(file_path, new_file_path) @staticmethod async def watch_for_new_logs(): while True: for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK): await LogProcessor.process_log_fileDump(file_path) await asyncio.sleep(10) # Check every 10 seconds