# telegram_utils.py import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import aiohttp import logging from telegram import Bot from telegram.constants import ParseMode from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME import time import logging from logging.handlers import RotatingFileHandler class TelegramUtils: def __init__(self): self.bot = None self.conn_pool = None self.timeout = None async def initialize(self): # Create a custom connection pool self.conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit self.timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout # Initialize Telegram Bot self.bot = Bot(token=TELEGRAM_BOT_TOKEN) async def send_telegram_message(self, message): if not self.bot: await self.initialize() try: await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") async def close(self): if self.conn_pool: await self.conn_pool.close() class CSVFormatter(logging.Formatter): def __init__(self): super().__init__() self.output = None def format(self, record): if self.output is None: self.output = csv.writer(record.stream) self.output.writerow(['Timestamp', 'Token In', 'Token Out', 'Amount In', 'Amount Out', 'USD Value In', 'USD Value Out', 'Transaction Hash', 'Wallet Address']) self.output.writerow([ self.formatTime(record, self.datefmt), record.token_in, record.token_out, record.amount_in, record.amount_out, record.usd_value_in, record.usd_value_out, record.tx_hash, record.wallet_address ]) return '' class Log: # Set up success logger for accounting CSV def __init__(self): logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) #logging.basicConfig(level=logging.INFO) # Set up error logger log_dir = './logs' log_file = os.path.join(log_dir, 'error.log') os.makedirs(log_dir, exist_ok=True) error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) error_file_handler.setLevel(logging.ERROR) error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) error_file_handler.formatter.converter = time.localtime error_logger = logging.getLogger('error_logger') error_logger.setLevel(logging.ERROR) error_logger.addHandler(error_file_handler) success_log_file = os.path.join(log_dir, 'successful_swaps.csv') success_file_handler = RotatingFileHandler(success_log_file, maxBytes=10*1024*1024, backupCount=5) success_file_handler.setFormatter(CSVFormatter()) success_logger_accounting_csv = logging.getLogger('success_logger_accounting_csv') success_logger_accounting_csv.setLevel(logging.INFO) success_logger_accounting_csv.addHandler(success_file_handler) def log_successful_swap(token_in, token_out, amount_in, amount_out, usd_value_in, usd_value_out, tx_hash, wallet_address): success_logger_accounting_csv.info('', extra={ 'token_in': token_in, 'token_out': token_out, 'amount_in': amount_in, 'amount_out': amount_out, 'usd_value_in': usd_value_in, 'usd_value_out': usd_value_out, 'tx_hash': tx_hash, 'wallet_address': wallet_address }) def safe_get_property(info, property_name, default='Unknown'): if not isinstance(info, dict): return str(default) value = info.get(property_name, default) return str(value) if value is not None else str(default) # Create a global instance of TelegramUtils telegram_utils = TelegramUtils() log = Log() # You can add more Telegram-related methods to the TelegramUtils class if needed pk = os.getenv("PK") async def get_pk(): global pk if not pk: try: script_dir = os.path.dirname(os.path.abspath(__file__)) with open(os.path.join(script_dir, 'secret.pk'), 'r') as f: pk = f.read().strip() if pk: logging.info("Private key loaded successfully from file.") else: logging.warning("Private key file is empty.") except FileNotFoundError: logging.warning("Private key file not found.") except Exception as e: logging.error(f"Error reading private key file: {str(e)}") if not pk: logging.error("Private key not found in environment variables. Will not be able to sign transactions.") # send TG warning message await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") return pk