This commit is contained in:
Dobromir Popov 2024-10-11 10:59:49 +03:00
parent 49a78083c4
commit eeeab53e11

View File

@ -38,6 +38,7 @@ import requests
import threading import threading
import re import re
from typing import List, Dict, Any, Tuple from typing import List, Dict, Any, Tuple
import random
load_dotenv() load_dotenv()
@ -1108,8 +1109,15 @@ async def follow_move(move):
await send_telegram_message(error_message) await send_telegram_message(error_message)
# Helper functions (implement these according to your needs) # Helper functions
SOLANA_ENDPOINTS = [
"wss://api.mainnet-beta.solana.com",
# "wss://solana-api.projectserum.com",
# "wss://rpc.ankr.com/solana",
# "wss://mainnet.rpcpool.com",
]
PING_INTERVAL = 30
SUBSCRIBE_INTERVAL = 180 # Resubscribe every 3 minutes
async def heartbeat(websocket): async def heartbeat(websocket):
@ -1120,89 +1128,98 @@ async def heartbeat(websocket):
except websockets.exceptions.ConnectionClosed: except websockets.exceptions.ConnectionClosed:
break break
import random first_subscription = True
async def subscribe_to_wallet(): async def subscribe_to_wallet():
SOLANA_ENDPOINTS = [ reconnect_delay = 5
"wss://api.mainnet-beta.solana.com", max_reconnect_delay = 60
"wss://solana-api.projectserum.com",
"wss://rpc.ankr.com/solana",
"wss://mainnet.rpcpool.com",
]
uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com
reconnect_delay = 5 # Start with a 5-second delay
max_reconnect_delay = 10 # Maximum delay of 60 seconds
while True: while True:
try: try:
current_url = random.choice(SOLANA_ENDPOINTS) current_url = random.choice(SOLANA_ENDPOINTS)
async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket:
logger.info("Connected to Solana websocket") logger.info(f"Connected to Solana websocket: {current_url}")
heartbeat_task = asyncio.create_task(heartbeat(websocket)) heartbeat_task = asyncio.create_task(heartbeat(websocket))
if first_subscription:
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{
"mentions": [FOLLOWED_WALLET]
},
{
"commitment": "confirmed"
}
]
}
await websocket.send(json.dumps(request))
logger.info("Subscription request sent")
conn_active = False
while True:
try:
response = await websocket.recv()
conn_active = True
response_data = json.loads(response)
logger.debug(f"Received response: {response_data}")
if 'result' in response_data:
subscription_id = response_data['result']
asyncio.create_task( list_initial_wallet_states()) asyncio.create_task( list_initial_wallet_states())
logger.info(f"Subscription successful. Subscription id: {subscription_id}") first_subscription = False
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
elif 'params' in response_data: while True:
log = response_data['params']['result'] subscription_id = await subscribe(websocket)
logging.debug(f"Received transaction log: {log}") if subscription_id:
asyncio.create_task(process_log(log)) await process_messages(websocket, subscription_id)
else: await asyncio.sleep(SUBSCRIBE_INTERVAL)
logger.warning(f"Unexpected response: {response}") await unsubscribe(websocket, subscription_id)
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed unexpectedly: {e}")
if conn_active:
conn_active = False
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now.\n Attempting to reconnect...")
break
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
break
# Cancel the heartbeat task when the connection is closed
heartbeat_task.cancel() heartbeat_task.cancel()
except websockets.exceptions.WebSocketException as e: except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error: {e}") logger.error(f"WebSocket error: {e}")
except Exception as e: except Exception as e:
logger.error(f"An unexpected error occurred: {e}") logger.error(f"An unexpected error occurred: {e}")
current_url = random.choice(SOLANA_ENDPOINTS)
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
await asyncio.sleep(reconnect_delay) await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay)
# Implement exponential backoff async def subscribe(websocket):
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{"mentions": [FOLLOWED_WALLET]},
{"commitment": "confirmed"}
]
}
await websocket.send(json.dumps(request))
logger.info("Subscription request sent")
response = await websocket.recv()
response_data = json.loads(response)
if 'result' in response_data:
subscription_id = response_data['result']
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
return subscription_id
else:
logger.warning(f"Unexpected response: {response_data}")
return None
async def unsubscribe(websocket, subscription_id):
if subscription_id:
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsUnsubscribe",
"params": [subscription_id]
}
await websocket.send(json.dumps(request))
logger.info(f"Unsubscribed from subscription id: {subscription_id}")
async def process_messages(websocket, subscription_id):
try:
while True:
response = await websocket.recv()
response_data = json.loads(response)
logger.debug(f"Received response: {response_data}")
if 'params' in response_data:
log = response_data['params']['result']
logger.debug(f"Received transaction log: {log}")
asyncio.create_task(process_log(log))
else:
logger.warning(f"Unexpected response: {response_data}")
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed unexpectedly: {e}")
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
pk = os.getenv("PK") pk = os.getenv("PK")