import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncio import json import logging import random import websockets from typing import Optional import requests import datetime logger = logging.getLogger(__name__) SOLANA_ENDPOINTS = [ "wss://api.mainnet-beta.solana.com", # "wss://solana-api.projectserum.com", # "wss://rpc.ankr.com/solana", # "wss://mainnet.rpcpool.com", ] PING_INTERVAL = 30 SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL ) class SolanaAPI: def __init__(self): self.websocket = None self.subscription_id = None self.message_queue = asyncio.Queue() async def connect(self): while True: try: current_url = random.choice(SOLANA_ENDPOINTS) self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=20) logger.info(f"Connected to Solana websocket: {current_url}") return except Exception as e: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) async def ws_jsonrpc(self, method, params=None): if not isinstance(params, list): params = [params] if params is not None else [] request = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params } await self.websocket.send(json.dumps(request)) response = await self.websocket.recv() response_data = json.loads(response) if 'result' in response_data: return response_data['result'] elif 'error' in response_data: logger.error(f"Error in WebSocket RPC call: {response_data['error']}") return None else: logger.warning(f"Unexpected response: {response_data}") return None async def subscribe(self): params = [ {"mentions": [FOLLOWED_WALLET]}, {"commitment": "confirmed"} ] result = await self.ws_jsonrpc("logsSubscribe", params) if result is not None: self.subscription_id = result logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") else: logger.error("Failed to subscribe") async def unsubscribe(self): if self.subscription_id: result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id]) if result: logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") self.subscription_id = None else: logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") async def receive_messages(self): while True: try: message = await self.websocket.recv() await self.message_queue.put(message) except websockets.exceptions.ConnectionClosedError: logger.error("WebSocket connection closed") break except Exception as e: logger.error(f"Error receiving message: {e}") break async def process_messages(self): while True: message = await self.message_queue.get() # Process the message here # You can add your message processing logic logger.info(f"Received message: {message}") async def close(self): if self.websocket: await self.websocket.close() logger.info("WebSocket connection closed") while True: message = await self.message_queue.get() try: response_data = json.loads(message) if 'params' in response_data: log = response_data['params']['result'] await process_log(log) else: logger.warning(f"Unexpected response: {response_data}") except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") except Exception as e: logger.error(f"An unexpected error occurred while processing message: {e}") finally: self.message_queue.task_done() async def solana_jsonrpc(method, params = None, jsonParsed = True): # target json example: # data = { # "jsonrpc": "2.0", # "id": 1, # "method": "getTransaction", # "params": [ # tx_signature, # { # "encoding": "jsonParsed", # "maxSupportedTransactionVersion": 0 # } # ] # } # if param is not array, make it array if not isinstance(params, list): params = [params] data = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params or [] } data["params"].append({"maxSupportedTransactionVersion": 0}) if jsonParsed: data["params"][1]["encoding"] = "jsonParsed" try: # url = 'https://solana.drpc.org' response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) response.raise_for_status() # Raises an error for bad responses result = response.json() if not 'result' in result or 'error' in result: print("Error fetching data from Solana RPC:", result) return None return result['result'] except Exception as e: logging.error(f"Error fetching data from Solana RPC: {e}") return None async def process_log(log): # Implement your log processing logic here pass async def send_telegram_message(message): # Implement your Telegram message sending logic here pass async def list_initial_wallet_states(): # Implement your initial wallet state listing logic here pass async def wallet_watch_loop(): solana_ws = SolanaAPI() first_subscription = True while True: try: await solana_ws.connect() await solana_ws.subscribe() if first_subscription: asyncio.create_task(list_initial_wallet_states()) first_subscription = False await send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) try: await asyncio.gather(receive_task, process_task) except asyncio.CancelledError: pass finally: receive_task.cancel() process_task.cancel() except Exception as e: logger.error(f"An unexpected error occurred: {e}") finally: await solana_ws.unsubscribe() if solana_ws.websocket: await solana_ws.websocket.close() await send_telegram_message("Reconnecting...") await asyncio.sleep(5) # Example usage # async def main(): # account_address = "Vote111111111111111111111111111111111111111" async def get_last_transactions(account_address, check_interval=300, limit=1000): last_check_time = None last_signature = None while True: current_time = datetime.now() if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: params = [ account_address, { "limit": limit } ] if last_signature: params[1]["before"] = last_signature result = await solana_jsonrpc("getSignaturesForAddress", params) if result: for signature in result: if last_signature and signature['signature'] == last_signature: break # Process the transaction await process_transaction(signature) if result: last_signature = result[0]['signature'] last_check_time = current_time await asyncio.sleep(1) # Sleep for 1 second before checking again async def process_transaction(signature): # Implement your logic to process each transaction print(f"Processing transaction: {signature['signature']}") # You can add more processing logic here, such as storing in a database, # triggering notifications, etc. if __name__ == "__main__": asyncio.run(wallet_watch_loop())