diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 21d4dc6..c0a4899 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -38,7 +38,6 @@ import requests import threading import re from typing import List, Dict, Any, Tuple -import random load_dotenv() @@ -64,16 +63,6 @@ app = Flask(__name__) ENV_FILE = '.env' -async def save_subscription_id(subscription_id): - # storing subscription id in .env file disabled - #set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id)) - logger.info(f"Saved subscription ID: {subscription_id}") - -async def load_subscription_id(): - subscription_id = os.getenv("SUBSCRIPTION_ID") - return int(subscription_id) if subscription_id else None - - # Function to find the latest log file def get_latest_log_file(): @@ -951,6 +940,7 @@ async def process_log(log_result): except Exception as e: logging.error(f"Error aquiring log details and following: {e}") + send_telegram_message(f"Not followed! Error following move.") return except Exception as e: @@ -1001,6 +991,14 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: async def follow_move(move): your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) + if your_balance_info is not None: + # Use the balance + print(f"Your balance: {your_balance_info['amount']}") + else: + print("No ballance found for {move['symbol_in']}. Skipping move.") + send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") + return + your_balance = your_balance_info['amount'] @@ -1112,107 +1110,102 @@ async def follow_move(move): # Helper functions (implement these according to your needs) -PING_INTERVAL = 30 # seconds -PING_TIMEOUT = 20 # seconds -INITIAL_RECONNECT_DELAY = 5 # seconds -MAX_RECONNECT_DELAY = 60 # seconds -SUBSCRIPTION_CHECK_INTERVAL = 60 # seconds -SOLANA_ENDPOINTS: List[str] = [ - "wss://api.mainnet-beta.solana.com", - "wss://solana-api.projectserum.com", - "wss://rpc.ankr.com/solana", - "wss://mainnet.rpcpool.com", -] -class SolanaSubscriber: - def __init__(self, followed_wallet: str): - self.followed_wallet = followed_wallet - self.subscription_id: str | None = None - self.current_endpoint: str = random.choice(SOLANA_ENDPOINTS) - self.reconnect_delay: int = INITIAL_RECONNECT_DELAY - self.websocket: websockets.WebSocketClientProtocol | None = None - self.send_telegram_message = send_telegram_message +async def heartbeat(websocket): + while True: + try: + await websocket.ping() + await asyncio.sleep(PING_INTERVAL) + except websockets.exceptions.ConnectionClosed: + break - async def subscribe(self) -> None: - while True: - try: - async with websockets.connect( - self.current_endpoint, - ping_interval=PING_INTERVAL, - ping_timeout=PING_TIMEOUT - ) as self.websocket: - logger.info(f"Connected to Solana websocket: {self.current_endpoint}") - await self.send_subscription_request() - await self.handle_messages() - except Exception as e: - logger.error(f"Connection error: {e}") - await self.handle_reconnection() +import random +async def subscribe_to_wallet(): + SOLANA_ENDPOINTS = [ + "wss://api.mainnet-beta.solana.com", + "wss://solana-api.projectserum.com", + "wss://rpc.ankr.com/solana", + "wss://mainnet.rpcpool.com", + ] + uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com + reconnect_delay = 5 # Start with a 5-second delay + max_reconnect_delay = 10 # Maximum delay of 60 seconds - async def send_subscription_request(self) -> None: - request: Dict[str, Any] = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [self.followed_wallet]}, - {"commitment": "confirmed"} - ] - } - await self.websocket.send(json.dumps(request)) - logger.info("Subscription request sent") - - async def handle_messages(self) -> None: - subscription_check_time = asyncio.get_event_loop().time() - while True: - try: - response = await asyncio.wait_for(self.websocket.recv(), timeout=PING_INTERVAL*2) - response_data = json.loads(response) + while True: + try: + current_url = random.choice(SOLANA_ENDPOINTS) + async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: + logger.info("Connected to Solana websocket") - if 'result' in response_data: - self.subscription_id = response_data['result'] - logger.info(f"Subscription successful. ID: {self.subscription_id}") - await self.send_telegram_message("Connected to Solana network. Watching for transactions.") - elif 'params' in response_data: - log = response_data['params']['result'] - logging.debug(f"Received transaction log: {log}") - asyncio.create_task(process_log(log)) - else: - logger.warning(f"Unexpected response: {response_data}") + heartbeat_task = asyncio.create_task(heartbeat(websocket)) - # Check subscription status periodically - current_time = asyncio.get_event_loop().time() - if current_time - subscription_check_time > SUBSCRIPTION_CHECK_INTERVAL: - if not self.subscription_id: - logger.warning("No active subscription. Resubscribing...") - await self.send_subscription_request() - subscription_check_time = current_time + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + { + "mentions": [FOLLOWED_WALLET] + }, + { + "commitment": "confirmed" + } + ] + } + + await websocket.send(json.dumps(request)) + logger.info("Subscription request sent") + conn_active = False + while True: + try: + response = await websocket.recv() + conn_active = True + response_data = json.loads(response) + logger.debug(f"Received response: {response_data}") + if 'result' in response_data: + subscription_id = response_data['result'] + asyncio.create_task( list_initial_wallet_states()) + logger.info(f"Subscription successful. Subscription id: {subscription_id}") + await send_telegram_message("Connected to Solana network. Watching for transactions now.") - except asyncio.TimeoutError: - logger.debug("No message received within ping interval") - except websockets.exceptions.ConnectionClosed as e: - logger.error(f"Connection closed unexpectedly: {e}") - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"ConnectionClosedError: conn closed unexpectedly: {e}") - await self.send_telegram_message("Connection to Solana network was closed. Attempting to reconnect...") - break - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - break + elif 'params' in response_data: + log = response_data['params']['result'] + logging.debug(f"Received transaction log: {log}") + asyncio.create_task(process_log(log)) - async def handle_reconnection(self) -> None: - logger.info(f"Attempting to reconnect in {self.reconnect_delay} seconds...") - await send_telegram_message(f"Attempting to reconnect to Solana network in {self.reconnect_delay} seconds...") - await asyncio.sleep(self.reconnect_delay) - self.reconnect_delay = min(self.reconnect_delay * 2, MAX_RECONNECT_DELAY) - self.current_endpoint = random.choice(SOLANA_ENDPOINTS) - self.subscription_id = None + else: + logger.warning(f"Unexpected response: {response}") + + except websockets.exceptions.ConnectionClosedError as e: + logger.error(f"Connection closed unexpectedly: {e}") + if conn_active: + conn_active = False + await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now.\n Attempting to reconnect...") + break + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + break + # Cancel the heartbeat task when the connection is closed + heartbeat_task.cancel() + except websockets.exceptions.WebSocketException as e: + logger.error(f"WebSocket error: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + current_url = random.choice(SOLANA_ENDPOINTS) + + logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") + await asyncio.sleep(reconnect_delay) - + # Implement exponential backoff + reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay) + + + pk = os.getenv("PK") if not pk: try: @@ -1235,9 +1228,7 @@ if not pk: async def main(): await send_telegram_message("Solana Agent Started. Connecting to mainnet...") - asyncio.create_task( list_initial_wallet_states()) - subscriber = SolanaSubscriber(FOLLOWED_WALLET) - await subscriber.subscribe() + await subscribe_to_wallet() def run_flask(): # Run Flask app without the reloader, so we can run the async main function