wip (broken)

This commit is contained in:
Dobromir Popov
2024-11-15 00:22:20 +02:00
parent b623c4cb15
commit fee5b516a1
4 changed files with 212 additions and 211 deletions

View File

@ -1,60 +1,182 @@
from asyncio.log import logger
import datetime
import json
import os
import asyncio
from pathlib import Path
from typing import Any, Dict
from .utils import TelegramUtils
from .storage import Storage
from .SolanaAPI import SolanaAPI
from .SolanaAPI import SAPI, SolanaAPI
LOG_DIRECTORY = "./logs"
FILE_MASK = "wh_*.json"
async def process_log_file(file_path):
# Read the file and extract transaction data
with open(file_path, 'r') as file:
data = file.read()
# Assume data is parsed into these variables
wallet_id = "extracted_wallet_id"
transaction_type = "extracted_transaction_type"
sell_currency = "extracted_sell_currency"
sell_amount = 0.0
sell_value = 0.0
buy_currency = "extracted_buy_currency"
buy_amount = 0.0
buy_value = 0.0
solana_signature = "extracted_solana_signature"
details = {}
# Process the webhook data
solana_api = SolanaAPI()
transaction_data = await solana_api.process_wh(data)
# Check if the transaction already exists
existing_transaction = await Storage.get_prisma().transaction.find_first(
where={'solana_signature': solana_signature}
)
class LogProcessor:
@staticmethod
def save_log(log: Dict[str, Any]) -> None:
"""Save log to JSON file with timestamp."""
try:
os.makedirs('./logs', exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/log_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(log, f, indent=2)
except Exception as e:
logger.error(f"Error saving RPC log: {e}")
if not existing_transaction:
# Store the transaction if it doesn't exist
transaction_data = {
'wallet_id': wallet_id,
'type': transaction_type,
'sell_currency': sell_currency,
'sell_amount': sell_amount,
'sell_value': sell_value,
'buy_currency': buy_currency,
'buy_amount': buy_amount,
'buy_value': buy_value,
'solana_signature': solana_signature,
'details': details
@staticmethod
def extract_transaction_details(logs: list) -> Dict[str, Any]:
"""Extract transaction details from logs."""
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
storage = Storage()
await storage.store_transaction(transaction_data)
# Rename the file to append '_saved'
new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix)
os.rename(file_path, new_file_path)
before_source_balance = 0
source_token_change = 0
async def watch_for_new_logs():
while True:
for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK):
await process_log_file(file_path)
await asyncio.sleep(10) # Check every 10 seconds
for i, log_entry in enumerate(logs):
if tr_details["order_id"] is None and "order_id" in log_entry:
tr_details["order_id"] = log_entry.split(":")[-1].strip()
tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
if "source_token_change" in log_entry:
parts = log_entry.split(", ")
for part in parts:
if "source_token_change" in part:
tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6
elif "destination_token_change" in part:
tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6
if "before_source_balance" in log_entry:
before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6
if "source_token_change" in log_entry:
source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6
if before_source_balance > 0 and source_token_change > 0:
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
if tr_details["percentage_swapped"] > 100:
tr_details["percentage_swapped"] /= 1000
return tr_details
@staticmethod
async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single log entry."""
if log_result['value']['err']:
return
logs = log_result['value']['logs']
swap_operations = [
'Program log: Instruction: Swap',
'Program log: Instruction: Swap2',
'Program log: Instruction: SwapExactAmountIn',
'Program log: Instruction: SwapV2'
]
try:
if not any(op in logs for op in swap_operations):
return
LogProcessor.save_log(log_result)
tx_signature = log_result['value']['signature']
tr_details = LogProcessor.extract_transaction_details(logs)
if not all([tr_details["token_in"], tr_details["token_out"],
tr_details["amount_in"], tr_details["amount_out"]]):
tr_details = await SAPI.get_transaction_details_info(tx_signature, logs)
# Update token information
token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
tr_details.update({
"symbol_in": token_in.get('symbol'),
"symbol_out": token_out.get('symbol'),
"amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0),
"amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0)
})
# Send notification
message = (
f"<b>Swap detected: </b>\n"
f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} "
f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}"
)
await TelegramUtils.send_telegram_message(message)
# Follow up actions
await SAPI.follow_move(tr_details)
await SAPI.save_token_info()
except Exception as e:
logger.error(f"Error processing log: {e}")
await TelegramUtils.send_telegram_message("Not followed! Error following move.")
return tr_details
@staticmethod
async def process_log_fileDump(self, file_path):
# Read the file and extract transaction data
with open(file_path, 'r') as file:
data = file.read()
# Assume data is parsed into these variables
wallet_id = "extracted_wallet_id"
transaction_type = "extracted_transaction_type"
sell_currency = "extracted_sell_currency"
sell_amount = 0.0
sell_value = 0.0
buy_currency = "extracted_buy_currency"
buy_amount = 0.0
buy_value = 0.0
solana_signature = "extracted_solana_signature"
details = {}
# Process the webhook data
solana_api = SolanaAPI()
transaction_data = await solana_api.process_wh(data)
# Check if the transaction already exists
existing_transaction = await Storage.get_prisma().transaction.find_first(
where={'solana_signature': solana_signature}
)
if not existing_transaction:
# Store the transaction if it doesn't exist
transaction_data = {
'wallet_id': wallet_id,
'type': transaction_type,
'sell_currency': sell_currency,
'sell_amount': sell_amount,
'sell_value': sell_value,
'buy_currency': buy_currency,
'buy_amount': buy_amount,
'buy_value': buy_value,
'solana_signature': solana_signature,
'details': details
}
storage = Storage()
await storage.store_transaction(transaction_data)
# Rename the file to append '_saved'
new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix)
os.rename(file_path, new_file_path)
@staticmethod
async def watch_for_new_logs():
while True:
for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK):
await LogProcessor.process_log_fileDump(file_path)
await asyncio.sleep(10) # Check every 10 seconds