This commit is contained in:
Dobromir Popov 2024-10-16 11:50:59 +03:00
parent d63d3d41bc
commit 296c4dcf81

View File

@ -34,6 +34,7 @@ class SolanaWS:
self.subscription_id = None self.subscription_id = None
self.message_queue = asyncio.Queue() self.message_queue = asyncio.Queue()
self.on_message = on_message self.on_message = on_message
self.websocket = None
async def connect(self): async def connect(self):
while True: while True:
@ -46,7 +47,7 @@ class SolanaWS:
logger.error(f"Failed to connect to {current_url}: {e}") logger.error(f"Failed to connect to {current_url}: {e}")
await asyncio.sleep(5) await asyncio.sleep(5)
async def ws_jsonrpc(self, method, params=None): async def ws_jsonrpc(self, ws, method, params=None, doProcessResponse = True):
if not isinstance(params, list): if not isinstance(params, list):
params = [params] if params is not None else [] params = [params] if params is not None else []
@ -57,7 +58,10 @@ class SolanaWS:
"params": params "params": params
} }
await self.websocket.send(json.dumps(request)) await ws.send(json.dumps(request))
if not doProcessResponse:
return None
else:
response = await self.websocket.recv() response = await self.websocket.recv()
response_data = json.loads(response) response_data = json.loads(response)
@ -75,7 +79,8 @@ class SolanaWS:
{"mentions": [FOLLOWED_WALLET]}, {"mentions": [FOLLOWED_WALLET]},
{"commitment": "confirmed"} {"commitment": "confirmed"}
] ]
result = await self.ws_jsonrpc("logsSubscribe", params) result = await self.ws_jsonrpc("logsSubscribe", params, doProcessResponse=False)
response = process_messages(self.websocket)
if result is not None: if result is not None:
self.subscription_id = result self.subscription_id = result
logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") logger.info(f"Subscription successful. Subscription id: {self.subscription_id}")
@ -171,7 +176,7 @@ class SolanaAPI:
asyncio.create_task(self.on_initial_subscription()) asyncio.create_task(self.on_initial_subscription())
first_subscription = False first_subscription = False
await self.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") await self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
receive_task = asyncio.create_task(solana_ws.receive_messages()) receive_task = asyncio.create_task(solana_ws.receive_messages())
process_task = asyncio.create_task(solana_ws.process_messages()) process_task = asyncio.create_task(solana_ws.process_messages())
@ -190,7 +195,7 @@ class SolanaAPI:
await solana_ws.unsubscribe() await solana_ws.unsubscribe()
if solana_ws.websocket: if solana_ws.websocket:
await solana_ws.close() await solana_ws.close()
await self.send_telegram_message("Reconnecting...") await self.on_bot_message("Reconnecting...")
await asyncio.sleep(5) await asyncio.sleep(5)
async def process_transaction(self, signature): async def process_transaction(self, signature):
@ -416,7 +421,7 @@ class SolanaDEX:
remaining_tokens = [addr for addr in token_addresses if addr not in prices] remaining_tokens = [addr for addr in token_addresses if addr not in prices]
# Try CoinGecko # Try CoinGecko
coingecko_prices = await get_prices_from_coingecko(remaining_tokens) coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens)
prices.update(coingecko_prices) prices.update(coingecko_prices)