telegram stability fix

This commit is contained in:
Dobromir Popov 2024-10-12 12:52:10 +03:00
parent 75565e21d8
commit 95dcdbd9cc

View File

@ -119,7 +119,12 @@ dexscreener_client = DexscreenerClient()
# Initialize Telegram Bot # Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN) # Create a custom connection pool
conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit
timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout
# Create the bot with the custom connection pool
bot = Bot(TELEGRAM_BOT_TOKEN, request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request)
# Token addresses (initialize with some known tokens) # Token addresses (initialize with some known tokens)
TOKEN_ADDRESSES = { TOKEN_ADDRESSES = {
@ -1257,9 +1262,10 @@ SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
# except websockets.exceptions.ConnectionClosed: # except websockets.exceptions.ConnectionClosed:
# break # break
first_subscription = True _first_subscription = True
_process_task = None
async def wallet_watch_loop(): async def wallet_watch_loop():
global first_subscription global first_subscription, process_task
reconnect_delay = 5 reconnect_delay = 5
max_reconnect_delay = 60 max_reconnect_delay = 60
@ -1279,21 +1285,21 @@ async def wallet_watch_loop():
subscription_id = await subscribe(websocket) subscription_id = await subscribe(websocket)
if subscription_id is not None: if subscription_id is not None:
await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
if first_subscription: if _first_subscription:
asyncio.create_task( list_initial_wallet_states()) asyncio.create_task( list_initial_wallet_states())
first_subscription = False _first_subscription = False
process_task = asyncio.create_task(process_messages(websocket, subscription_id)) _process_task = asyncio.create_task(process_messages(websocket, subscription_id))
while True: while True:
try:# drop subscription now try:# drop subscription now
await process_messages(websocket, subscription_id) await process_messages(websocket, subscription_id)
# await asyncio.run(process_task) # await asyncio.run(_process_task)
# await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) # await asyncio.wait_for(_process_task, timeout=SUBSCRIBE_INTERVAL)
except asyncio.TimeoutError: except asyncio.TimeoutError:
# Timeout occurred, time to resubscribe # Timeout occurred, time to resubscribe
if not PROCESSING_LOG: if not PROCESSING_LOG:
process_task.cancel() _process_task.cancel()
try: try:
await process_task await _process_task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
await unsubscribe(websocket, subscription_id) await unsubscribe(websocket, subscription_id)