gogo2/crypto/sol/modules/utils.py
2024-10-15 23:19:30 +03:00

43 lines
1.4 KiB
Python

# telegram_utils.py
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import aiohttp
import logging
from telegram import Bot
from telegram.constants import ParseMode
from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME
class TelegramUtils:
def __init__(self):
self.bot = None
self.conn_pool = None
self.timeout = None
async def initialize(self):
# Create a custom connection pool
self.conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit
self.timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout
# Initialize Telegram Bot
self.bot = Bot(token=TELEGRAM_BOT_TOKEN)
async def send_telegram_message(self, message):
if not self.bot:
await self.initialize()
try:
# await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
logging.info(f"Telegram message sent: {message}")
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")
async def close(self):
if self.conn_pool:
await self.conn_pool.close()
# Create a global instance of TelegramUtils
telegram_utils = TelegramUtils()
# You can add more Telegram-related methods to the TelegramUtils class if needed