more info in telegram;

better WS connection robustness
This commit is contained in:
Dobromir Popov 2024-10-11 00:07:01 +03:00
parent 16efc143c8
commit bc2a8a2a9d

View File

@ -38,6 +38,7 @@ import requests
import threading import threading
import re import re
from typing import List, Dict, Any, Tuple from typing import List, Dict, Any, Tuple
import random
load_dotenv() load_dotenv()
@ -775,7 +776,7 @@ async def list_initial_wallet_states():
FOLLOWED_WALLET_VALUE = 0 FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items(): for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0: if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})")
FOLLOWED_WALLET_VALUE += info['value'] FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = [] your_wallet_state = []
@ -1111,101 +1112,107 @@ async def follow_move(move):
# Helper functions (implement these according to your needs) # Helper functions (implement these according to your needs)
PING_INTERVAL = 30 # seconds
PING_TIMEOUT = 20 # seconds
INITIAL_RECONNECT_DELAY = 5 # seconds
MAX_RECONNECT_DELAY = 60 # seconds
SUBSCRIPTION_CHECK_INTERVAL = 60 # seconds
SOLANA_ENDPOINTS: List[str] = [
"wss://api.mainnet-beta.solana.com",
"wss://solana-api.projectserum.com",
"wss://rpc.ankr.com/solana",
"wss://mainnet.rpcpool.com",
]
async def heartbeat(websocket): class SolanaSubscriber:
while True: def __init__(self, followed_wallet: str):
try: self.followed_wallet = followed_wallet
await websocket.ping() self.subscription_id: str | None = None
await asyncio.sleep(PING_INTERVAL) self.current_endpoint: str = random.choice(SOLANA_ENDPOINTS)
except websockets.exceptions.ConnectionClosed: self.reconnect_delay: int = INITIAL_RECONNECT_DELAY
break self.websocket: websockets.WebSocketClientProtocol | None = None
self.send_telegram_message = send_telegram_message
async def subscribe_to_wallet(): async def subscribe(self) -> None:
SOLANA_ENDPOINTS = [ while True:
"wss://api.mainnet-beta.solana.com", try:
"wss://solana-api.projectserum.com", async with websockets.connect(
"wss://rpc.ankr.com/solana", self.current_endpoint,
"wss://mainnet.rpcpool.com", ping_interval=PING_INTERVAL,
] ping_timeout=PING_TIMEOUT
uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com ) as self.websocket:
reconnect_delay = 5 # Start with a 5-second delay logger.info(f"Connected to Solana websocket: {self.current_endpoint}")
max_reconnect_delay = 60 # Maximum delay of 60 seconds await self.send_subscription_request()
await self.handle_messages()
except Exception as e:
logger.error(f"Connection error: {e}")
await self.handle_reconnection()
while True: async def send_subscription_request(self) -> None:
try: request: Dict[str, Any] = {
async with websockets.connect(uri, ping_interval=30, ping_timeout=20) as websocket: "jsonrpc": "2.0",
logger.info("Connected to Solana websocket") "id": 1,
"method": "logsSubscribe",
"params": [
{"mentions": [self.followed_wallet]},
{"commitment": "confirmed"}
]
}
await self.websocket.send(json.dumps(request))
logger.info("Subscription request sent")
async def handle_messages(self) -> None:
subscription_check_time = asyncio.get_event_loop().time()
while True:
try:
response = await asyncio.wait_for(self.websocket.recv(), timeout=PING_INTERVAL*2)
response_data = json.loads(response)
heartbeat_task = asyncio.create_task(heartbeat(websocket)) if 'result' in response_data:
self.subscription_id = response_data['result']
logger.info(f"Subscription successful. ID: {self.subscription_id}")
await self.send_telegram_message("Connected to Solana network. Watching for transactions.")
elif 'params' in response_data:
log = response_data['params']['result']
logging.debug(f"Received transaction log: {log}")
asyncio.create_task(process_log(log))
else:
logger.warning(f"Unexpected response: {response_data}")
subscription_id = await load_subscription_id() # Check subscription status periodically
current_time = asyncio.get_event_loop().time()
if current_time - subscription_check_time > SUBSCRIPTION_CHECK_INTERVAL:
if not self.subscription_id:
logger.warning("No active subscription. Resubscribing...")
await self.send_subscription_request()
subscription_check_time = current_time
request = { except asyncio.TimeoutError:
"jsonrpc": "2.0", logger.debug("No message received within ping interval")
"id": 1, except websockets.exceptions.ConnectionClosed as e:
"method": "logsSubscribe", logger.error(f"Connection closed unexpectedly: {e}")
"params": [ except websockets.exceptions.ConnectionClosedError as e:
{ logger.error(f"ConnectionClosedError: conn closed unexpectedly: {e}")
"mentions": [FOLLOWED_WALLET] await self.send_telegram_message("Connection to Solana network was closed. Attempting to reconnect...")
}, break
{ except json.JSONDecodeError as e:
"commitment": "confirmed" logger.error(f"Failed to decode JSON: {e}")
} except Exception as e:
] logger.error(f"An unexpected error occurred: {e}")
} break
await websocket.send(json.dumps(request)) async def handle_reconnection(self) -> None:
logger.info("Subscription request sent") logger.info(f"Attempting to reconnect in {self.reconnect_delay} seconds...")
conn_active = False await send_telegram_message(f"Attempting to reconnect to Solana network in {self.reconnect_delay} seconds...")
while True: await asyncio.sleep(self.reconnect_delay)
try: self.reconnect_delay = min(self.reconnect_delay * 2, MAX_RECONNECT_DELAY)
response = await websocket.recv() self.current_endpoint = random.choice(SOLANA_ENDPOINTS)
conn_active = True self.subscription_id = None
response_data = json.loads(response)
logger.debug(f"Received response: {response_data}")
if 'result' in response_data:
subscription_id = response_data['result']
await save_subscription_id(subscription_id)
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
elif 'params' in response_data:
log = response_data['params']['result']
logging.debug(f"Received transaction log: {log}")
# Create a new task for processing the log
asyncio.create_task(process_log(log))
else:
logger.warning(f"Unexpected response: {response}")
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed unexpectedly: {e}")
if conn_active:
conn_active = False
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
break
# Cancel the heartbeat task when the connection is closed
heartbeat_task.cancel()
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
await asyncio.sleep(reconnect_delay)
# Implement exponential backoff
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
pk = os.getenv("PK") pk = os.getenv("PK")
if not pk: if not pk:
try: try:
@ -1229,7 +1236,8 @@ if not pk:
async def main(): async def main():
await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
asyncio.create_task( list_initial_wallet_states()) asyncio.create_task( list_initial_wallet_states())
await subscribe_to_wallet() subscriber = SolanaSubscriber(FOLLOWED_WALLET)
await subscriber.subscribe()
def run_flask(): def run_flask():
# Run Flask app without the reloader, so we can run the async main function # Run Flask app without the reloader, so we can run the async main function