From eeeab53e114c2b670ae6dd9fdf6395ce33ef5c4c Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 11 Oct 2024 10:59:49 +0300 Subject: [PATCH] fix rpc --- crypto/sol/app.py | 147 ++++++++++++++++++++++++++-------------------- 1 file changed, 82 insertions(+), 65 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index ecb0814..e15ce29 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -38,6 +38,7 @@ import requests import threading import re from typing import List, Dict, Any, Tuple +import random load_dotenv() @@ -1108,8 +1109,15 @@ async def follow_move(move): await send_telegram_message(error_message) -# Helper functions (implement these according to your needs) - +# Helper functions +SOLANA_ENDPOINTS = [ + "wss://api.mainnet-beta.solana.com", + # "wss://solana-api.projectserum.com", + # "wss://rpc.ankr.com/solana", + # "wss://mainnet.rpcpool.com", +] +PING_INTERVAL = 30 +SUBSCRIBE_INTERVAL = 180 # Resubscribe every 3 minutes async def heartbeat(websocket): @@ -1120,89 +1128,98 @@ async def heartbeat(websocket): except websockets.exceptions.ConnectionClosed: break -import random +first_subscription = True async def subscribe_to_wallet(): - SOLANA_ENDPOINTS = [ - "wss://api.mainnet-beta.solana.com", - "wss://solana-api.projectserum.com", - "wss://rpc.ankr.com/solana", - "wss://mainnet.rpcpool.com", - ] - uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com - reconnect_delay = 5 # Start with a 5-second delay - max_reconnect_delay = 10 # Maximum delay of 60 seconds + reconnect_delay = 5 + max_reconnect_delay = 60 while True: try: current_url = random.choice(SOLANA_ENDPOINTS) async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: - logger.info("Connected to Solana websocket") - + logger.info(f"Connected to Solana websocket: {current_url}") heartbeat_task = asyncio.create_task(heartbeat(websocket)) + if first_subscription: + asyncio.create_task( list_initial_wallet_states()) + first_subscription = False - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - { - "mentions": [FOLLOWED_WALLET] - }, - { - "commitment": "confirmed" - } - ] - } - - await websocket.send(json.dumps(request)) - logger.info("Subscription request sent") - conn_active = False while True: - try: - response = await websocket.recv() - conn_active = True - response_data = json.loads(response) - logger.debug(f"Received response: {response_data}") - if 'result' in response_data: - subscription_id = response_data['result'] - asyncio.create_task( list_initial_wallet_states()) - logger.info(f"Subscription successful. Subscription id: {subscription_id}") - await send_telegram_message("Connected to Solana network. Watching for transactions now.") - - elif 'params' in response_data: - log = response_data['params']['result'] - logging.debug(f"Received transaction log: {log}") - asyncio.create_task(process_log(log)) - - else: - logger.warning(f"Unexpected response: {response}") + subscription_id = await subscribe(websocket) + if subscription_id: + await process_messages(websocket, subscription_id) - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - if conn_active: - conn_active = False - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now.\n Attempting to reconnect...") - break - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - break + await asyncio.sleep(SUBSCRIBE_INTERVAL) + await unsubscribe(websocket, subscription_id) - # Cancel the heartbeat task when the connection is closed heartbeat_task.cancel() except websockets.exceptions.WebSocketException as e: logger.error(f"WebSocket error: {e}") except Exception as e: logger.error(f"An unexpected error occurred: {e}") - current_url = random.choice(SOLANA_ENDPOINTS) logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") await asyncio.sleep(reconnect_delay) - - # Implement exponential backoff - reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) + reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay) + +async def subscribe(websocket): + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + {"mentions": [FOLLOWED_WALLET]}, + {"commitment": "confirmed"} + ] + } + + await websocket.send(json.dumps(request)) + logger.info("Subscription request sent") + + response = await websocket.recv() + response_data = json.loads(response) + + if 'result' in response_data: + subscription_id = response_data['result'] + logger.info(f"Subscription successful. Subscription id: {subscription_id}") + await send_telegram_message("Connected to Solana network. Watching for transactions now.") + return subscription_id + else: + logger.warning(f"Unexpected response: {response_data}") + return None + +async def unsubscribe(websocket, subscription_id): + if subscription_id: + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsUnsubscribe", + "params": [subscription_id] + } + await websocket.send(json.dumps(request)) + logger.info(f"Unsubscribed from subscription id: {subscription_id}") + +async def process_messages(websocket, subscription_id): + try: + while True: + response = await websocket.recv() + response_data = json.loads(response) + logger.debug(f"Received response: {response_data}") + + if 'params' in response_data: + log = response_data['params']['result'] + logger.debug(f"Received transaction log: {log}") + asyncio.create_task(process_log(log)) + else: + logger.warning(f"Unexpected response: {response_data}") + + except websockets.exceptions.ConnectionClosedError as e: + logger.error(f"Connection closed unexpectedly: {e}") + await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") pk = os.getenv("PK")