diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 737bc4b..6c68b93 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -55,9 +55,10 @@ from config import ( error_logger ) -from modules.utils import (send_telegram_message, get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) +from modules.utils import (get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) -from modules.SolanaAPI import SolanaAPI, solana_jsonrpc +from modules.SolanaAPI import SolanaAPI, solana_jsonrpc, wallet_watch_loop +from modules.utils import telegram_utils, send_telegram_message # # config = load_config() # load_dotenv() @@ -139,6 +140,11 @@ except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # +if not telegram_utils.bot: + try: + asyncio.run(telegram_utils.initialize()) + except Exception as e: + logging.error(f"Error initializing Telegram bot: {str(e)}") # async def send_telegram_message(message): # try: # await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) @@ -1210,167 +1216,6 @@ async def follow_move(move): # Helper functions -SOLANA_ENDPOINTS = [ - "wss://api.mainnet-beta.solana.com", - # "wss://solana-api.projectserum.com", - # "wss://rpc.ankr.com/solana", - # "wss://mainnet.rpcpool.com", -] -PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes - - -# async def heartbeat(websocket): -# while True: -# try: -# await websocket.ping() -# await asyncio.sleep(PING_INTERVAL) -# except websockets.exceptions.ConnectionClosed: -# break - -_first_subscription = True -_process_task = None -async def wallet_watch_loop(): - global _first_subscription, _process_task - reconnect_delay = 5 - max_reconnect_delay = 60 - - while True: - try: - try: - subscription_id = None - current_url = random.choice(SOLANA_ENDPOINTS) - async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: - logger.info(f"Connected to Solana websocket: {current_url}") - # heartbeat_task = asyncio.create_task(heartbeat(websocket)) - - while True: - if websocket.closed: - break - - subscription_id = await subscribe(websocket) - if subscription_id is not None: - await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") - if _first_subscription: - asyncio.create_task( list_initial_wallet_states()) - _first_subscription = False - _process_task = asyncio.create_task(process_messages(websocket, subscription_id)) - while True: - try:# drop subscription now - await process_messages(websocket, subscription_id) - # await asyncio.run(_process_task) - # await asyncio.wait_for(_process_task, timeout=SUBSCRIBE_INTERVAL) - except asyncio.TimeoutError: - # Timeout occurred, time to resubscribe - if not PROCESSING_LOG: - _process_task.cancel() - try: - await _process_task - except asyncio.CancelledError: - pass - await unsubscribe(websocket, subscription_id) - new_sub_id = await subscribe(websocket) - if new_sub_id is None: break - if new_sub_id > 1: # we sometimes get True instead of integer, so we cje - subscription_id = new_sub_id - logger.info(f"New subscription created with ID: {subscription_id}") - elif new_sub_id is True: - # Already subscribed - logger.info("Already subscribed, continuing with existing subscription") - if subscription_id: - process_task = asyncio.create_task(process_messages(websocket, subscription_id)) - - else: - # process_messages completed (shouldn't happen unless there's an error) - break - else: - send_telegram_message("Failed to connect. Retrying...") - - # heartbeat_task.cancel() - - except websockets.exceptions.WebSocketException as e: - logger.error(f"WebSocket error: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - - await unsubscribe(websocket, subscription_id) - await send_telegram_message("reconnecting...") - logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") - websocket.close() - except Exception as e: - logger.error(f"An unexpected error occurred - breaking watch loop: {e}") - - await asyncio.sleep(reconnect_delay) - reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay) - -async def subscribe(websocket): - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [FOLLOWED_WALLET]}, - {"commitment": "confirmed"} - ] - } - try: - await websocket.send(json.dumps(request)) - logger.info("Subscription request sent") - - response = await websocket.recv() - response_data = json.loads(response) - - if 'result' in response_data: - subscription_id = response_data['result'] - logger.info(f"Subscription successful. Subscription id: {subscription_id}") - return subscription_id - else: - logger.warning(f"Unexpected response: {response_data}") - return None - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") - await websocket.close() - return None - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - return None - -async def unsubscribe(websocket, subscription_id): - if subscription_id: - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsUnsubscribe", - "params": [subscription_id] - } - await websocket.send(json.dumps(request)) - logger.info(f"Unsubscribed from subscription id: {subscription_id}") - subscription_id = None - -async def process_messages(websocket, subscription_id): - try: - while True: - response = await websocket.recv() - response_data = json.loads(response) - logger.debug(f"Received response: {response_data}") - - if 'params' in response_data: - log = response_data['params']['result'] - logger.debug(f"Received transaction log: {log}") - asyncio.create_task(process_log(log)) - else: - logger.warning(f"Unexpected response: {response_data}") - - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") - pass - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - pk = os.getenv("PK") async def check_PK(): diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index ee4793b..661a86e 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -13,8 +13,14 @@ import datetime logger = logging.getLogger(__name__) -SOLANA_ENDPOINTS = ["your_endpoint_1", "your_endpoint_2"] # Add your endpoints here -SUBSCRIBE_INTERVAL = 300 # 5 minutes in seconds +SOLANA_ENDPOINTS = [ + "wss://api.mainnet-beta.solana.com", + # "wss://solana-api.projectserum.com", + # "wss://rpc.ankr.com/solana", + # "wss://mainnet.rpcpool.com", +] +PING_INTERVAL = 30 +SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL @@ -23,9 +29,9 @@ FOLLOWED_WALLET, SOLANA_HTTP_URL class SolanaAPI: def __init__(self): - self.websocket: Optional[websockets.WebSocketClientProtocol] = None - self.subscription_id: Optional[int] = None - self.message_queue: asyncio.Queue = asyncio.Queue() + self.websocket = None + self.subscription_id = None + self.message_queue = asyncio.Queue() async def connect(self): while True: @@ -38,37 +44,50 @@ class SolanaAPI: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) - async def subscribe(self): + async def ws_jsonrpc(self, method, params=None): + if not isinstance(params, list): + params = [params] if params is not None else [] + request = { "jsonrpc": "2.0", "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [FOLLOWED_WALLET]}, - {"commitment": "confirmed"} - ] + "method": method, + "params": params } + await self.websocket.send(json.dumps(request)) response = await self.websocket.recv() response_data = json.loads(response) - + if 'result' in response_data: - self.subscription_id = response_data['result'] - logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + return response_data['result'] + elif 'error' in response_data: + logger.error(f"Error in WebSocket RPC call: {response_data['error']}") + return None else: logger.warning(f"Unexpected response: {response_data}") + return None + + async def subscribe(self): + params = [ + {"mentions": [FOLLOWED_WALLET]}, + {"commitment": "confirmed"} + ] + result = await self.ws_jsonrpc("logsSubscribe", params) + if result is not None: + self.subscription_id = result + logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + else: + logger.error("Failed to subscribe") async def unsubscribe(self): if self.subscription_id: - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsUnsubscribe", - "params": [self.subscription_id] - } - await self.websocket.send(json.dumps(request)) - logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") - self.subscription_id = None + result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id]) + if result: + logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") + self.subscription_id = None + else: + logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") async def receive_messages(self): while True: @@ -83,6 +102,16 @@ class SolanaAPI: break async def process_messages(self): + while True: + message = await self.message_queue.get() + # Process the message here + # You can add your message processing logic + logger.info(f"Received message: {message}") + + async def close(self): + if self.websocket: + await self.websocket.close() + logger.info("WebSocket connection closed") while True: message = await self.message_queue.get() try: diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index 684e99d..b70b567 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -9,19 +9,35 @@ from telegram import Bot from telegram.constants import ParseMode from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME -# Initialize Telegram Bot -# Create a custom connection pool -conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit -timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout +class TelegramUtils: + def __init__(self): + self.bot = None + self.conn_pool = None + self.timeout = None - # bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) -bot = Bot(token=TELEGRAM_BOT_TOKEN) + async def initialize(self): + # Create a custom connection pool + self.conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit + self.timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout -async def send_telegram_message(message): - try: - await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) - logging.info(f"Telegram message sent: {message}") - except Exception as e: - logging.error(f"Error sending Telegram message: {str(e)}") + # Initialize Telegram Bot + self.bot = Bot(token=TELEGRAM_BOT_TOKEN) -# You can add more Telegram-related functions here if needed \ No newline at end of file + async def send_telegram_message(self, message): + if not self.bot: + await self.initialize() + + try: + await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + logging.info(f"Telegram message sent: {message}") + except Exception as e: + logging.error(f"Error sending Telegram message: {str(e)}") + + async def close(self): + if self.conn_pool: + await self.conn_pool.close() + +# Create a global instance of TelegramUtils +telegram_utils = TelegramUtils() + +# You can add more Telegram-related methods to the TelegramUtils class if needed \ No newline at end of file