From bc2a8a2a9d121542b58a4adac9206e4b8fc3d682 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 11 Oct 2024 00:07:01 +0300 Subject: [PATCH] more info in telegram; better WS connection robustness --- crypto/sol/app.py | 180 ++++++++++++++++++++++++---------------------- 1 file changed, 94 insertions(+), 86 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index fb8499b..21d4dc6 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -38,6 +38,7 @@ import requests import threading import re from typing import List, Dict, Any, Tuple +import random load_dotenv() @@ -775,7 +776,7 @@ async def list_initial_wallet_states(): FOLLOWED_WALLET_VALUE = 0 for address, info in followed_converted_balances.items(): if info['value'] is not None and info['value'] > 0: - followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") + followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") FOLLOWED_WALLET_VALUE += info['value'] your_wallet_state = [] @@ -1111,101 +1112,107 @@ async def follow_move(move): # Helper functions (implement these according to your needs) +PING_INTERVAL = 30 # seconds +PING_TIMEOUT = 20 # seconds +INITIAL_RECONNECT_DELAY = 5 # seconds +MAX_RECONNECT_DELAY = 60 # seconds +SUBSCRIPTION_CHECK_INTERVAL = 60 # seconds +SOLANA_ENDPOINTS: List[str] = [ + "wss://api.mainnet-beta.solana.com", + "wss://solana-api.projectserum.com", + "wss://rpc.ankr.com/solana", + "wss://mainnet.rpcpool.com", +] -async def heartbeat(websocket): - while True: - try: - await websocket.ping() - await asyncio.sleep(PING_INTERVAL) - except websockets.exceptions.ConnectionClosed: - break +class SolanaSubscriber: + def __init__(self, followed_wallet: str): + self.followed_wallet = followed_wallet + self.subscription_id: str | None = None + self.current_endpoint: str = random.choice(SOLANA_ENDPOINTS) + self.reconnect_delay: int = INITIAL_RECONNECT_DELAY + self.websocket: websockets.WebSocketClientProtocol | None = None + self.send_telegram_message = send_telegram_message -async def subscribe_to_wallet(): - SOLANA_ENDPOINTS = [ - "wss://api.mainnet-beta.solana.com", - "wss://solana-api.projectserum.com", - "wss://rpc.ankr.com/solana", - "wss://mainnet.rpcpool.com", - ] - uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com - reconnect_delay = 5 # Start with a 5-second delay - max_reconnect_delay = 60 # Maximum delay of 60 seconds + async def subscribe(self) -> None: + while True: + try: + async with websockets.connect( + self.current_endpoint, + ping_interval=PING_INTERVAL, + ping_timeout=PING_TIMEOUT + ) as self.websocket: + logger.info(f"Connected to Solana websocket: {self.current_endpoint}") + await self.send_subscription_request() + await self.handle_messages() + except Exception as e: + logger.error(f"Connection error: {e}") + await self.handle_reconnection() - while True: - try: - async with websockets.connect(uri, ping_interval=30, ping_timeout=20) as websocket: - logger.info("Connected to Solana websocket") + async def send_subscription_request(self) -> None: + request: Dict[str, Any] = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + {"mentions": [self.followed_wallet]}, + {"commitment": "confirmed"} + ] + } + await self.websocket.send(json.dumps(request)) + logger.info("Subscription request sent") + + async def handle_messages(self) -> None: + subscription_check_time = asyncio.get_event_loop().time() + while True: + try: + response = await asyncio.wait_for(self.websocket.recv(), timeout=PING_INTERVAL*2) + response_data = json.loads(response) - heartbeat_task = asyncio.create_task(heartbeat(websocket)) + if 'result' in response_data: + self.subscription_id = response_data['result'] + logger.info(f"Subscription successful. ID: {self.subscription_id}") + await self.send_telegram_message("Connected to Solana network. Watching for transactions.") + elif 'params' in response_data: + log = response_data['params']['result'] + logging.debug(f"Received transaction log: {log}") + asyncio.create_task(process_log(log)) + else: + logger.warning(f"Unexpected response: {response_data}") - subscription_id = await load_subscription_id() + # Check subscription status periodically + current_time = asyncio.get_event_loop().time() + if current_time - subscription_check_time > SUBSCRIPTION_CHECK_INTERVAL: + if not self.subscription_id: + logger.warning("No active subscription. Resubscribing...") + await self.send_subscription_request() + subscription_check_time = current_time - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - { - "mentions": [FOLLOWED_WALLET] - }, - { - "commitment": "confirmed" - } - ] - } + except asyncio.TimeoutError: + logger.debug("No message received within ping interval") + except websockets.exceptions.ConnectionClosed as e: + logger.error(f"Connection closed unexpectedly: {e}") + except websockets.exceptions.ConnectionClosedError as e: + logger.error(f"ConnectionClosedError: conn closed unexpectedly: {e}") + await self.send_telegram_message("Connection to Solana network was closed. Attempting to reconnect...") + break + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + break - await websocket.send(json.dumps(request)) - logger.info("Subscription request sent") - conn_active = False - while True: - try: - response = await websocket.recv() - conn_active = True - response_data = json.loads(response) - logger.debug(f"Received response: {response_data}") - if 'result' in response_data: - subscription_id = response_data['result'] - await save_subscription_id(subscription_id) - logger.info(f"Subscription successful. Subscription id: {subscription_id}") - await send_telegram_message("Connected to Solana network. Watching for transactions now.") + async def handle_reconnection(self) -> None: + logger.info(f"Attempting to reconnect in {self.reconnect_delay} seconds...") + await send_telegram_message(f"Attempting to reconnect to Solana network in {self.reconnect_delay} seconds...") + await asyncio.sleep(self.reconnect_delay) + self.reconnect_delay = min(self.reconnect_delay * 2, MAX_RECONNECT_DELAY) + self.current_endpoint = random.choice(SOLANA_ENDPOINTS) + self.subscription_id = None - elif 'params' in response_data: - log = response_data['params']['result'] - logging.debug(f"Received transaction log: {log}") - # Create a new task for processing the log - asyncio.create_task(process_log(log)) - else: - logger.warning(f"Unexpected response: {response}") - - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - if conn_active: - conn_active = False - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - break - - # Cancel the heartbeat task when the connection is closed - heartbeat_task.cancel() - - except websockets.exceptions.WebSocketException as e: - logger.error(f"WebSocket error: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - - logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") - await asyncio.sleep(reconnect_delay) - # Implement exponential backoff - reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) - - - + pk = os.getenv("PK") if not pk: try: @@ -1229,7 +1236,8 @@ if not pk: async def main(): await send_telegram_message("Solana Agent Started. Connecting to mainnet...") asyncio.create_task( list_initial_wallet_states()) - await subscribe_to_wallet() + subscriber = SolanaSubscriber(FOLLOWED_WALLET) + await subscriber.subscribe() def run_flask(): # Run Flask app without the reloader, so we can run the async main function