gogo2/crypto/sol/modules/log_processor.py
Dobromir Popov fee5b516a1 wip (broken)
2024-11-15 00:22:20 +02:00

183 lines
6.8 KiB
Python

from asyncio.log import logger
import datetime
import json
import os
import asyncio
from pathlib import Path
from typing import Any, Dict
from .utils import TelegramUtils
from .storage import Storage
from .SolanaAPI import SAPI, SolanaAPI
LOG_DIRECTORY = "./logs"
FILE_MASK = "wh_*.json"
class LogProcessor:
@staticmethod
def save_log(log: Dict[str, Any]) -> None:
"""Save log to JSON file with timestamp."""
try:
os.makedirs('./logs', exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/log_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(log, f, indent=2)
except Exception as e:
logger.error(f"Error saving RPC log: {e}")
@staticmethod
def extract_transaction_details(logs: list) -> Dict[str, Any]:
"""Extract transaction details from logs."""
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
before_source_balance = 0
source_token_change = 0
for i, log_entry in enumerate(logs):
if tr_details["order_id"] is None and "order_id" in log_entry:
tr_details["order_id"] = log_entry.split(":")[-1].strip()
tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
if "source_token_change" in log_entry:
parts = log_entry.split(", ")
for part in parts:
if "source_token_change" in part:
tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6
elif "destination_token_change" in part:
tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6
if "before_source_balance" in log_entry:
before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6
if "source_token_change" in log_entry:
source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6
if before_source_balance > 0 and source_token_change > 0:
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
if tr_details["percentage_swapped"] > 100:
tr_details["percentage_swapped"] /= 1000
return tr_details
@staticmethod
async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single log entry."""
if log_result['value']['err']:
return
logs = log_result['value']['logs']
swap_operations = [
'Program log: Instruction: Swap',
'Program log: Instruction: Swap2',
'Program log: Instruction: SwapExactAmountIn',
'Program log: Instruction: SwapV2'
]
try:
if not any(op in logs for op in swap_operations):
return
LogProcessor.save_log(log_result)
tx_signature = log_result['value']['signature']
tr_details = LogProcessor.extract_transaction_details(logs)
if not all([tr_details["token_in"], tr_details["token_out"],
tr_details["amount_in"], tr_details["amount_out"]]):
tr_details = await SAPI.get_transaction_details_info(tx_signature, logs)
# Update token information
token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
tr_details.update({
"symbol_in": token_in.get('symbol'),
"symbol_out": token_out.get('symbol'),
"amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0),
"amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0)
})
# Send notification
message = (
f"<b>Swap detected: </b>\n"
f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} "
f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}"
)
await TelegramUtils.send_telegram_message(message)
# Follow up actions
await SAPI.follow_move(tr_details)
await SAPI.save_token_info()
except Exception as e:
logger.error(f"Error processing log: {e}")
await TelegramUtils.send_telegram_message("Not followed! Error following move.")
return tr_details
@staticmethod
async def process_log_fileDump(self, file_path):
# Read the file and extract transaction data
with open(file_path, 'r') as file:
data = file.read()
# Assume data is parsed into these variables
wallet_id = "extracted_wallet_id"
transaction_type = "extracted_transaction_type"
sell_currency = "extracted_sell_currency"
sell_amount = 0.0
sell_value = 0.0
buy_currency = "extracted_buy_currency"
buy_amount = 0.0
buy_value = 0.0
solana_signature = "extracted_solana_signature"
details = {}
# Process the webhook data
solana_api = SolanaAPI()
transaction_data = await solana_api.process_wh(data)
# Check if the transaction already exists
existing_transaction = await Storage.get_prisma().transaction.find_first(
where={'solana_signature': solana_signature}
)
if not existing_transaction:
# Store the transaction if it doesn't exist
transaction_data = {
'wallet_id': wallet_id,
'type': transaction_type,
'sell_currency': sell_currency,
'sell_amount': sell_amount,
'sell_value': sell_value,
'buy_currency': buy_currency,
'buy_amount': buy_amount,
'buy_value': buy_value,
'solana_signature': solana_signature,
'details': details
}
storage = Storage()
await storage.store_transaction(transaction_data)
# Rename the file to append '_saved'
new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix)
os.rename(file_path, new_file_path)
@staticmethod
async def watch_for_new_logs():
while True:
for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK):
await LogProcessor.process_log_fileDump(file_path)
await asyncio.sleep(10) # Check every 10 seconds