From 6ab6e301ec6495f0dbb7b1c9b4088da24292ca64 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 15 Oct 2024 11:01:53 +0300 Subject: [PATCH] fix multiple recv() hooks on the same WS --- crypto/sol/app.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 3ab76e2..c14bde0 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1176,7 +1176,7 @@ async def wallet_watch_loop(): if first_subscription: asyncio.create_task( list_initial_wallet_states()) first_subscription = False - process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + process_task = asyncio.create_task(process_messages(websocket)) while True: try: await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) @@ -1198,7 +1198,7 @@ async def wallet_watch_loop(): # Already subscribed logger.info("Already subscribed, continuing with existing subscription") if subscription_id: - process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + process_task = asyncio.create_task(process_messages(websocket)) else: # process_messages completed (shouldn't happen unless there's an error) @@ -1236,22 +1236,7 @@ async def subscribe(websocket): try: await websocket.send(json.dumps(request)) logger.info("Subscription request sent") - - response = await websocket.recv() - response_data = json.loads(response) - - if 'result' in response_data: - subscription_id = response_data['result'] - logger.info(f"Subscription successful. Subscription id: {subscription_id}") - return subscription_id - else: - logger.warning(f"Unexpected response: {response_data}") - return None - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") - await websocket.close() - return None + return process_messages(websocket) except Exception as e: logger.error(f"An unexpected error occurred: {e}") return None @@ -1268,14 +1253,18 @@ async def unsubscribe(websocket, subscription_id): logger.info(f"Unsubscribed from subscription id: {subscription_id}") subscription_id = None -async def process_messages(websocket, subscription_id): +async def process_messages(websocket): try: while True: response = await websocket.recv() response_data = json.loads(response) logger.debug(f"Received response: {response_data}") - if 'params' in response_data: + if 'result' in response_data: + subscription_id = response_data['result'] + logger.info(f"Subscription successful. Subscription id: {subscription_id}") + return subscription_id + elif 'params' in response_data: log = response_data['params']['result'] logger.debug(f"Received transaction log: {log}") asyncio.create_task(process_log(log))