fix multiple recv() hooks on the same WS

This commit is contained in:
Dobromir Popov 2024-10-15 11:01:53 +03:00
parent c8fdbf3c61
commit 6ab6e301ec

View File

@ -1176,7 +1176,7 @@ async def wallet_watch_loop():
if first_subscription: if first_subscription:
asyncio.create_task( list_initial_wallet_states()) asyncio.create_task( list_initial_wallet_states())
first_subscription = False first_subscription = False
process_task = asyncio.create_task(process_messages(websocket, subscription_id)) process_task = asyncio.create_task(process_messages(websocket))
while True: while True:
try: try:
await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL)
@ -1198,7 +1198,7 @@ async def wallet_watch_loop():
# Already subscribed # Already subscribed
logger.info("Already subscribed, continuing with existing subscription") logger.info("Already subscribed, continuing with existing subscription")
if subscription_id: if subscription_id:
process_task = asyncio.create_task(process_messages(websocket, subscription_id)) process_task = asyncio.create_task(process_messages(websocket))
else: else:
# process_messages completed (shouldn't happen unless there's an error) # process_messages completed (shouldn't happen unless there's an error)
@ -1236,22 +1236,7 @@ async def subscribe(websocket):
try: try:
await websocket.send(json.dumps(request)) await websocket.send(json.dumps(request))
logger.info("Subscription request sent") logger.info("Subscription request sent")
return process_messages(websocket)
response = await websocket.recv()
response_data = json.loads(response)
if 'result' in response_data:
subscription_id = response_data['result']
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
return subscription_id
else:
logger.warning(f"Unexpected response: {response_data}")
return None
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed unexpectedly: {e}")
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
await websocket.close()
return None
except Exception as e: except Exception as e:
logger.error(f"An unexpected error occurred: {e}") logger.error(f"An unexpected error occurred: {e}")
return None return None
@ -1268,14 +1253,18 @@ async def unsubscribe(websocket, subscription_id):
logger.info(f"Unsubscribed from subscription id: {subscription_id}") logger.info(f"Unsubscribed from subscription id: {subscription_id}")
subscription_id = None subscription_id = None
async def process_messages(websocket, subscription_id): async def process_messages(websocket):
try: try:
while True: while True:
response = await websocket.recv() response = await websocket.recv()
response_data = json.loads(response) response_data = json.loads(response)
logger.debug(f"Received response: {response_data}") logger.debug(f"Received response: {response_data}")
if 'params' in response_data: if 'result' in response_data:
subscription_id = response_data['result']
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
return subscription_id
elif 'params' in response_data:
log = response_data['params']['result'] log = response_data['params']['result']
logger.debug(f"Received transaction log: {log}") logger.debug(f"Received transaction log: {log}")
asyncio.create_task(process_log(log)) asyncio.create_task(process_log(log))