diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index e0fa5cb..7f5cda2 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -34,6 +34,7 @@ class SolanaWS: self.subscription_id = None self.message_queue = asyncio.Queue() self.on_message = on_message + self.websocket = None async def connect(self): while True: @@ -46,7 +47,7 @@ class SolanaWS: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) - async def ws_jsonrpc(self, method, params=None): + async def ws_jsonrpc(self, ws, method, params=None, doProcessResponse = True): if not isinstance(params, list): params = [params] if params is not None else [] @@ -57,25 +58,29 @@ class SolanaWS: "params": params } - await self.websocket.send(json.dumps(request)) - response = await self.websocket.recv() - response_data = json.loads(response) - - if 'result' in response_data: - return response_data['result'] - elif 'error' in response_data: - logger.error(f"Error in WebSocket RPC call: {response_data['error']}") + await ws.send(json.dumps(request)) + if not doProcessResponse: return None else: - logger.warning(f"Unexpected response: {response_data}") - return None + response = await self.websocket.recv() + response_data = json.loads(response) + + if 'result' in response_data: + return response_data['result'] + elif 'error' in response_data: + logger.error(f"Error in WebSocket RPC call: {response_data['error']}") + return None + else: + logger.warning(f"Unexpected response: {response_data}") + return None async def subscribe(self): params = [ {"mentions": [FOLLOWED_WALLET]}, {"commitment": "confirmed"} ] - result = await self.ws_jsonrpc("logsSubscribe", params) + result = await self.ws_jsonrpc("logsSubscribe", params, doProcessResponse=False) + response = process_messages(self.websocket) if result is not None: self.subscription_id = result logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") @@ -171,7 +176,7 @@ class SolanaAPI: asyncio.create_task(self.on_initial_subscription()) first_subscription = False - await self.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + await self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) @@ -190,7 +195,7 @@ class SolanaAPI: await solana_ws.unsubscribe() if solana_ws.websocket: await solana_ws.close() - await self.send_telegram_message("Reconnecting...") + await self.on_bot_message("Reconnecting...") await asyncio.sleep(5) async def process_transaction(self, signature): @@ -416,7 +421,7 @@ class SolanaDEX: remaining_tokens = [addr for addr in token_addresses if addr not in prices] # Try CoinGecko - coingecko_prices = await get_prices_from_coingecko(remaining_tokens) + coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens) prices.update(coingecko_prices)