gogo2/crypto/sol/app.py
2024-11-14 15:39:35 +02:00

180 lines
6.2 KiB
Python

import asyncio
import datetime
import json
import logging
import os
from typing import Dict, Any
import uvicorn
from asgiref.wsgi import WsgiToAsgi
from dotenv import load_dotenv
from config import DO_WATCH_WALLET
from modules.SolanaAPI import SAPI
from modules.log_processor import watch_for_new_logs
from modules.utils import telegram_utils
from modules.webui import init_app, teardown_app
# Load environment variables
load_dotenv()
load_dotenv('.env.secret')
# Configure logging
logger = logging.getLogger(__name__)
class LogProcessor:
@staticmethod
def save_log(log: Dict[str, Any]) -> None:
"""Save log to JSON file with timestamp."""
try:
os.makedirs('./logs', exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/log_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(log, f, indent=2)
except Exception as e:
logger.error(f"Error saving RPC log: {e}")
@staticmethod
def extract_transaction_details(logs: list) -> Dict[str, Any]:
"""Extract transaction details from logs."""
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
before_source_balance = 0
source_token_change = 0
for i, log_entry in enumerate(logs):
if tr_details["order_id"] is None and "order_id" in log_entry:
tr_details["order_id"] = log_entry.split(":")[-1].strip()
tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
if "source_token_change" in log_entry:
parts = log_entry.split(", ")
for part in parts:
if "source_token_change" in part:
tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6
elif "destination_token_change" in part:
tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6
if "before_source_balance" in log_entry:
before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6
if "source_token_change" in log_entry:
source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6
if before_source_balance > 0 and source_token_change > 0:
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
if tr_details["percentage_swapped"] > 100:
tr_details["percentage_swapped"] /= 1000
return tr_details
@staticmethod
async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single log entry."""
if log_result['value']['err']:
return
logs = log_result['value']['logs']
swap_operations = [
'Program log: Instruction: Swap',
'Program log: Instruction: Swap2',
'Program log: Instruction: SwapExactAmountIn',
'Program log: Instruction: SwapV2'
]
try:
if not any(op in logs for op in swap_operations):
return
LogProcessor.save_log(log_result)
tx_signature = log_result['value']['signature']
tr_details = LogProcessor.extract_transaction_details(logs)
if not all([tr_details["token_in"], tr_details["token_out"],
tr_details["amount_in"], tr_details["amount_out"]]):
tr_details = await SAPI.get_transaction_details_info(tx_signature, logs)
# Update token information
token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
tr_details.update({
"symbol_in": token_in.get('symbol'),
"symbol_out": token_out.get('symbol'),
"amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0),
"amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0)
})
# Send notification
message = (
f"<b>Swap detected: </b>\n"
f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} "
f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}"
)
await telegram_utils.send_telegram_message(message)
# Follow up actions
await SAPI.follow_move(tr_details)
await SAPI.save_token_info()
except Exception as e:
logger.error(f"Error processing log: {e}")
await telegram_utils.send_telegram_message("Not followed! Error following move.")
return tr_details
class Bot:
@staticmethod
async def initialize():
"""Initialize bot and start monitoring."""
await telegram_utils.initialize()
await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
asyncio.create_task(watch_for_new_logs())
if DO_WATCH_WALLET:
asyncio.create_task(SAPI.wallet_watch_loop())
async def start_server():
"""Run the ASGI server."""
config = uvicorn.Config(
"app:asgi_app",
host="0.0.0.0",
port=3001,
log_level="info",
reload=True
)
server = uvicorn.Server(config)
await server.serve()
async def main():
"""Main application entry point."""
# Initialize app and create ASGI wrapper
app = await init_app()
global asgi_app
asgi_app = WsgiToAsgi(app)
# Initialize bot
await Bot.initialize()
# Start server
try:
await start_server()
except KeyboardInterrupt:
logger.info("Shutting down...")
await teardown_app()
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application terminated by user")