rollback WS; random RPC endpoint
This commit is contained in:
parent
207ef730a0
commit
7d63f60247
@ -38,7 +38,6 @@ import requests
|
|||||||
import threading
|
import threading
|
||||||
import re
|
import re
|
||||||
from typing import List, Dict, Any, Tuple
|
from typing import List, Dict, Any, Tuple
|
||||||
import random
|
|
||||||
|
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@ -64,16 +63,6 @@ app = Flask(__name__)
|
|||||||
|
|
||||||
ENV_FILE = '.env'
|
ENV_FILE = '.env'
|
||||||
|
|
||||||
async def save_subscription_id(subscription_id):
|
|
||||||
# storing subscription id in .env file disabled
|
|
||||||
#set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id))
|
|
||||||
logger.info(f"Saved subscription ID: {subscription_id}")
|
|
||||||
|
|
||||||
async def load_subscription_id():
|
|
||||||
subscription_id = os.getenv("SUBSCRIPTION_ID")
|
|
||||||
return int(subscription_id) if subscription_id else None
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Function to find the latest log file
|
# Function to find the latest log file
|
||||||
def get_latest_log_file():
|
def get_latest_log_file():
|
||||||
@ -951,6 +940,7 @@ async def process_log(log_result):
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error aquiring log details and following: {e}")
|
logging.error(f"Error aquiring log details and following: {e}")
|
||||||
|
send_telegram_message(f"Not followed! Error following move.")
|
||||||
return
|
return
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -1001,6 +991,14 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
|
|||||||
async def follow_move(move):
|
async def follow_move(move):
|
||||||
your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||||
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
|
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
|
||||||
|
if your_balance_info is not None:
|
||||||
|
# Use the balance
|
||||||
|
print(f"Your balance: {your_balance_info['amount']}")
|
||||||
|
else:
|
||||||
|
print("No ballance found for {move['symbol_in']}. Skipping move.")
|
||||||
|
send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
|
||||||
|
return
|
||||||
|
|
||||||
your_balance = your_balance_info['amount']
|
your_balance = your_balance_info['amount']
|
||||||
|
|
||||||
|
|
||||||
@ -1112,89 +1110,78 @@ async def follow_move(move):
|
|||||||
|
|
||||||
# Helper functions (implement these according to your needs)
|
# Helper functions (implement these according to your needs)
|
||||||
|
|
||||||
PING_INTERVAL = 30 # seconds
|
|
||||||
PING_TIMEOUT = 20 # seconds
|
|
||||||
INITIAL_RECONNECT_DELAY = 5 # seconds
|
|
||||||
MAX_RECONNECT_DELAY = 60 # seconds
|
|
||||||
SUBSCRIPTION_CHECK_INTERVAL = 60 # seconds
|
|
||||||
|
|
||||||
SOLANA_ENDPOINTS: List[str] = [
|
|
||||||
|
async def heartbeat(websocket):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
await websocket.ping()
|
||||||
|
await asyncio.sleep(PING_INTERVAL)
|
||||||
|
except websockets.exceptions.ConnectionClosed:
|
||||||
|
break
|
||||||
|
|
||||||
|
import random
|
||||||
|
async def subscribe_to_wallet():
|
||||||
|
SOLANA_ENDPOINTS = [
|
||||||
"wss://api.mainnet-beta.solana.com",
|
"wss://api.mainnet-beta.solana.com",
|
||||||
"wss://solana-api.projectserum.com",
|
"wss://solana-api.projectserum.com",
|
||||||
"wss://rpc.ankr.com/solana",
|
"wss://rpc.ankr.com/solana",
|
||||||
"wss://mainnet.rpcpool.com",
|
"wss://mainnet.rpcpool.com",
|
||||||
]
|
]
|
||||||
|
uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com
|
||||||
|
reconnect_delay = 5 # Start with a 5-second delay
|
||||||
|
max_reconnect_delay = 10 # Maximum delay of 60 seconds
|
||||||
|
|
||||||
class SolanaSubscriber:
|
|
||||||
def __init__(self, followed_wallet: str):
|
|
||||||
self.followed_wallet = followed_wallet
|
|
||||||
self.subscription_id: str | None = None
|
|
||||||
self.current_endpoint: str = random.choice(SOLANA_ENDPOINTS)
|
|
||||||
self.reconnect_delay: int = INITIAL_RECONNECT_DELAY
|
|
||||||
self.websocket: websockets.WebSocketClientProtocol | None = None
|
|
||||||
self.send_telegram_message = send_telegram_message
|
|
||||||
|
|
||||||
async def subscribe(self) -> None:
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async with websockets.connect(
|
current_url = random.choice(SOLANA_ENDPOINTS)
|
||||||
self.current_endpoint,
|
async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket:
|
||||||
ping_interval=PING_INTERVAL,
|
logger.info("Connected to Solana websocket")
|
||||||
ping_timeout=PING_TIMEOUT
|
|
||||||
) as self.websocket:
|
|
||||||
logger.info(f"Connected to Solana websocket: {self.current_endpoint}")
|
|
||||||
await self.send_subscription_request()
|
|
||||||
await self.handle_messages()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Connection error: {e}")
|
|
||||||
await self.handle_reconnection()
|
|
||||||
|
|
||||||
async def send_subscription_request(self) -> None:
|
heartbeat_task = asyncio.create_task(heartbeat(websocket))
|
||||||
request: Dict[str, Any] = {
|
|
||||||
|
request = {
|
||||||
"jsonrpc": "2.0",
|
"jsonrpc": "2.0",
|
||||||
"id": 1,
|
"id": 1,
|
||||||
"method": "logsSubscribe",
|
"method": "logsSubscribe",
|
||||||
"params": [
|
"params": [
|
||||||
{"mentions": [self.followed_wallet]},
|
{
|
||||||
{"commitment": "confirmed"}
|
"mentions": [FOLLOWED_WALLET]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"commitment": "confirmed"
|
||||||
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
await self.websocket.send(json.dumps(request))
|
|
||||||
logger.info("Subscription request sent")
|
|
||||||
|
|
||||||
async def handle_messages(self) -> None:
|
await websocket.send(json.dumps(request))
|
||||||
subscription_check_time = asyncio.get_event_loop().time()
|
logger.info("Subscription request sent")
|
||||||
|
conn_active = False
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
response = await asyncio.wait_for(self.websocket.recv(), timeout=PING_INTERVAL*2)
|
response = await websocket.recv()
|
||||||
|
conn_active = True
|
||||||
response_data = json.loads(response)
|
response_data = json.loads(response)
|
||||||
|
logger.debug(f"Received response: {response_data}")
|
||||||
if 'result' in response_data:
|
if 'result' in response_data:
|
||||||
self.subscription_id = response_data['result']
|
subscription_id = response_data['result']
|
||||||
logger.info(f"Subscription successful. ID: {self.subscription_id}")
|
asyncio.create_task( list_initial_wallet_states())
|
||||||
await self.send_telegram_message("Connected to Solana network. Watching for transactions.")
|
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
|
||||||
|
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
|
||||||
|
|
||||||
elif 'params' in response_data:
|
elif 'params' in response_data:
|
||||||
log = response_data['params']['result']
|
log = response_data['params']['result']
|
||||||
logging.debug(f"Received transaction log: {log}")
|
logging.debug(f"Received transaction log: {log}")
|
||||||
asyncio.create_task(process_log(log))
|
asyncio.create_task(process_log(log))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Unexpected response: {response_data}")
|
logger.warning(f"Unexpected response: {response}")
|
||||||
|
|
||||||
# Check subscription status periodically
|
|
||||||
current_time = asyncio.get_event_loop().time()
|
|
||||||
if current_time - subscription_check_time > SUBSCRIPTION_CHECK_INTERVAL:
|
|
||||||
if not self.subscription_id:
|
|
||||||
logger.warning("No active subscription. Resubscribing...")
|
|
||||||
await self.send_subscription_request()
|
|
||||||
subscription_check_time = current_time
|
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
logger.debug("No message received within ping interval")
|
|
||||||
except websockets.exceptions.ConnectionClosed as e:
|
|
||||||
logger.error(f"Connection closed unexpectedly: {e}")
|
|
||||||
except websockets.exceptions.ConnectionClosedError as e:
|
except websockets.exceptions.ConnectionClosedError as e:
|
||||||
logger.error(f"ConnectionClosedError: conn closed unexpectedly: {e}")
|
logger.error(f"Connection closed unexpectedly: {e}")
|
||||||
await self.send_telegram_message("Connection to Solana network was closed. Attempting to reconnect...")
|
if conn_active:
|
||||||
|
conn_active = False
|
||||||
|
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now.\n Attempting to reconnect...")
|
||||||
break
|
break
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error(f"Failed to decode JSON: {e}")
|
logger.error(f"Failed to decode JSON: {e}")
|
||||||
@ -1202,14 +1189,20 @@ class SolanaSubscriber:
|
|||||||
logger.error(f"An unexpected error occurred: {e}")
|
logger.error(f"An unexpected error occurred: {e}")
|
||||||
break
|
break
|
||||||
|
|
||||||
async def handle_reconnection(self) -> None:
|
# Cancel the heartbeat task when the connection is closed
|
||||||
logger.info(f"Attempting to reconnect in {self.reconnect_delay} seconds...")
|
heartbeat_task.cancel()
|
||||||
await send_telegram_message(f"Attempting to reconnect to Solana network in {self.reconnect_delay} seconds...")
|
|
||||||
await asyncio.sleep(self.reconnect_delay)
|
|
||||||
self.reconnect_delay = min(self.reconnect_delay * 2, MAX_RECONNECT_DELAY)
|
|
||||||
self.current_endpoint = random.choice(SOLANA_ENDPOINTS)
|
|
||||||
self.subscription_id = None
|
|
||||||
|
|
||||||
|
except websockets.exceptions.WebSocketException as e:
|
||||||
|
logger.error(f"WebSocket error: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An unexpected error occurred: {e}")
|
||||||
|
current_url = random.choice(SOLANA_ENDPOINTS)
|
||||||
|
|
||||||
|
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
|
||||||
|
await asyncio.sleep(reconnect_delay)
|
||||||
|
|
||||||
|
# Implement exponential backoff
|
||||||
|
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -1235,9 +1228,7 @@ if not pk:
|
|||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
||||||
asyncio.create_task( list_initial_wallet_states())
|
await subscribe_to_wallet()
|
||||||
subscriber = SolanaSubscriber(FOLLOWED_WALLET)
|
|
||||||
await subscriber.subscribe()
|
|
||||||
|
|
||||||
def run_flask():
|
def run_flask():
|
||||||
# Run Flask app without the reloader, so we can run the async main function
|
# Run Flask app without the reloader, so we can run the async main function
|
||||||
|
Loading…
x
Reference in New Issue
Block a user