From 3db80762a08e00d82f529fe836a3ae5b1348d9ec Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Thu, 3 Oct 2024 01:48:11 +0300 Subject: [PATCH] handle ws exceptions on rpc calls --- crypto/sol/app.py | 87 +++++++++++++++++++++++++++++++---------------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 499d449..dd88d3e 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -40,9 +40,6 @@ DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') # Initialize Telegram Bot bot = Bot(token=TELEGRAM_BOT_TOKEN) -# Initialize logging -logging.basicConfig(level=logging.DEBUG) - # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", @@ -462,37 +459,69 @@ async def on_logs(log): async def subscribe_to_wallet(): uri = SOLANA_URL - async with websockets.connect(uri) as websocket: - # Correct the `params` format to be an array - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - { - "mentions": [FOLLOWED_WALLET] # Changed from YOUR_WALLET to FOLLOWED_WALLET - }, - { - "commitment": "confirmed" + reconnect_delay = 5 # Start with a 5-second delay + max_reconnect_delay = 60 # Maximum delay of 60 seconds + + while True: + try: + async with websockets.connect(uri) as websocket: + logger.info("Connected to Solana websocket") + + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + { + "mentions": [FOLLOWED_WALLET] + }, + { + "commitment": "confirmed" + } + ] } - ] - } - await websocket.send(json.dumps(request)) + await websocket.send(json.dumps(request)) + logger.info("Subscription request sent") - # Listen for messages - while True: - response = await websocket.recv() - response_data = json.loads(response) - if 'result' in response_data: - print(f"Subscription successful. Subscription id: {response_data['result']}") - elif 'params' in response_data: - await on_logs(response_data['params']['result']) - else: - print(f"Unexpected response: {response}") - + while True: + try: + response = await websocket.recv() + response_data = json.loads(response) + if 'result' in response_data: + logger.info(f"Subscription successful. Subscription id: {response_data['result']}") + elif 'params' in response_data: + await on_logs(response_data['params']['result']) + else: + logger.warning(f"Unexpected response: {response}") + + except websockets.exceptions.ConnectionClosedError as e: + logger.error(f"Connection closed unexpectedly: {e}") + break + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + break + + except websockets.exceptions.WebSocketException as e: + logger.error(f"WebSocket error: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + + logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") + await asyncio.sleep(reconnect_delay) + + # Implement exponential backoff + reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) + + +logger = logging.getLogger(__name__) async def main(): + # Initialize logging + logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.INFO) + await send_telegram_message("Solana Agent Application Started") await list_initial_wallet_states() await subscribe_to_wallet()