refactoring
This commit is contained in:
@ -74,7 +74,8 @@ class SolanaWS:
|
||||
self.message_queue = asyncio.Queue()
|
||||
self.on_message = on_message
|
||||
self.websocket = None
|
||||
|
||||
self.last_msg_responded = False
|
||||
|
||||
async def connect(self):
|
||||
while True:
|
||||
try:
|
||||
@ -85,8 +86,10 @@ class SolanaWS:
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to {current_url}: {e}")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
async def ws_jsonrpc(self, ws, method, params=None, doProcessResponse = True):
|
||||
async def ws_jsonrpc(self, method, params=None, doProcessResponse = True):
|
||||
|
||||
if not isinstance(params, list):
|
||||
params = [params] if params is not None else []
|
||||
|
||||
@ -96,14 +99,15 @@ class SolanaWS:
|
||||
"method": method,
|
||||
"params": params
|
||||
}
|
||||
|
||||
await ws.send(json.dumps(request))
|
||||
self.last_msg_responded = False
|
||||
await self.websocket.send(json.dumps(request))
|
||||
if not doProcessResponse:
|
||||
return None
|
||||
else:
|
||||
response = await self.websocket.recv()
|
||||
response_data = json.loads(response)
|
||||
|
||||
self.last_msg_responded = True
|
||||
|
||||
if 'result' in response_data:
|
||||
return response_data['result']
|
||||
elif 'error' in response_data:
|
||||
@ -112,17 +116,21 @@ class SolanaWS:
|
||||
else:
|
||||
logger.warning(f"Unexpected response: {response_data}")
|
||||
return None
|
||||
|
||||
|
||||
async def subscribe(self):
|
||||
params = [
|
||||
{"mentions": [FOLLOWED_WALLET]},
|
||||
{"commitment": "confirmed"}
|
||||
]
|
||||
result = await self.ws_jsonrpc("logsSubscribe", params, doProcessResponse=False)
|
||||
response = process_messages(self.websocket)
|
||||
if result is not None:
|
||||
# define onmessage as inline callback to get subscription_id which waits for last_msg_responded
|
||||
# self.on_message = lambda message: self.subscription_id = message.get('result')
|
||||
result = await self.ws_jsonrpc("logsSubscribe", params)
|
||||
|
||||
if result is not None and result > 0:
|
||||
self.subscription_id = result
|
||||
logger.info(f"Subscription successful. Subscription id: {self.subscription_id}")
|
||||
elif result:
|
||||
logger.error("already subscribed")
|
||||
else:
|
||||
logger.error("Failed to subscribe")
|
||||
|
||||
@ -190,8 +198,13 @@ class SolanaWS:
|
||||
class SolanaAPI:
|
||||
def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None):
|
||||
self.process_transaction = process_transaction_callback
|
||||
self.on_initial_subscription = on_initial_subscription_callback
|
||||
self.on_bot_message = on_bot_message,
|
||||
self.on_initial_subscription = on_initial_subscription_callback
|
||||
# if callable(on_initial_subscription_callback) else lambda: None
|
||||
|
||||
# Define a default lambda function for on_bot_message
|
||||
default_on_bot_message = lambda message: logger.info(f"Bot message: {message}")
|
||||
# Use the provided on_bot_message if it's callable, otherwise use the default
|
||||
self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message
|
||||
|
||||
self.dex = SolanaDEX(DISPLAY_CURRENCY)
|
||||
self.solana_ws = SolanaWS(on_message=self.process_transaction)
|
||||
@ -201,6 +214,7 @@ class SolanaAPI:
|
||||
message = await solana_ws.message_queue.get()
|
||||
await self.process_transaction(message)
|
||||
|
||||
|
||||
_first_subscription = True
|
||||
async def wallet_watch_loop(self):
|
||||
|
||||
@ -212,11 +226,11 @@ class SolanaAPI:
|
||||
await solana_ws.connect()
|
||||
await solana_ws.subscribe()
|
||||
|
||||
if first_subscription:
|
||||
asyncio.create_task(self.on_initial_subscription())
|
||||
if first_subscription and self.on_initial_subscription is not None:
|
||||
await self.on_initial_subscription
|
||||
first_subscription = False
|
||||
|
||||
await self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
|
||||
self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
|
||||
|
||||
receive_task = asyncio.create_task(solana_ws.receive_messages())
|
||||
process_task = asyncio.create_task(solana_ws.process_messages())
|
||||
@ -235,7 +249,8 @@ class SolanaAPI:
|
||||
await solana_ws.unsubscribe()
|
||||
if solana_ws.websocket:
|
||||
await solana_ws.close()
|
||||
await self.on_bot_message("Reconnecting...")
|
||||
if self.on_bot_message:
|
||||
await self.on_bot_message("Reconnecting...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
|
||||
@ -862,4 +877,4 @@ async def save_token_info():
|
||||
json.dump(TOKENS_INFO, f, indent=2)
|
||||
|
||||
|
||||
SAPI = SolanaAPI()
|
||||
SAPI = SolanaAPI( on_initial_subscription_callback=SolanaDEX.list_initial_wallet_states())
|
Reference in New Issue
Block a user