diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 661a86e..bc19389 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -26,12 +26,14 @@ from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL ) +from modules.utils import telegram_utils -class SolanaAPI: - def __init__(self): +class SolanaWS: + def __init__(self, on_message: Optional[callable] = None): self.websocket = None self.subscription_id = None self.message_queue = asyncio.Queue() + self.on_message = on_message async def connect(self): while True: @@ -104,8 +106,7 @@ class SolanaAPI: async def process_messages(self): while True: message = await self.message_queue.get() - # Process the message here - # You can add your message processing logic + await self.on_message(message) logger.info(f"Received message: {message}") async def close(self): @@ -171,98 +172,97 @@ async def solana_jsonrpc(method, params = None, jsonParsed = True): logging.error(f"Error fetching data from Solana RPC: {e}") return None - -async def process_log(log): - # Implement your log processing logic here - pass - -async def send_telegram_message(message): - # Implement your Telegram message sending logic here - pass - -async def list_initial_wallet_states(): - # Implement your initial wallet state listing logic here - pass - -async def wallet_watch_loop(): - solana_ws = SolanaAPI() - first_subscription = True - - while True: - try: - await solana_ws.connect() - await solana_ws.subscribe() - - if first_subscription: - asyncio.create_task(list_initial_wallet_states()) - first_subscription = False - - await send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") - - receive_task = asyncio.create_task(solana_ws.receive_messages()) - process_task = asyncio.create_task(solana_ws.process_messages()) - - try: - await asyncio.gather(receive_task, process_task) - except asyncio.CancelledError: - pass - finally: - receive_task.cancel() - process_task.cancel() - - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - finally: - await solana_ws.unsubscribe() - if solana_ws.websocket: - await solana_ws.websocket.close() - await send_telegram_message("Reconnecting...") - await asyncio.sleep(5) - -# Example usage -# async def main(): -# account_address = "Vote111111111111111111111111111111111111111" +class SolanaAPI: -async def get_last_transactions(account_address, check_interval=300, limit=1000): - last_check_time = None - last_signature = None + def __init__(self, process_log_callback, send_telegram_message_callback, list_initial_wallet_states_callback): + self.process_log = process_log_callback + self.list_initial_wallet_states = list_initial_wallet_states_callback - while True: - current_time = datetime.now() + async def process_messages(self, solana_ws): + while True: + message = await solana_ws.message_queue.get() + await self.process_log(message) + + async def wallet_watch_loop(): + solana_ws = SolanaWS(on_message=process_log) + first_subscription = True - if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: - params = [ - account_address, - { - "limit": limit - } - ] + while True: + try: + await solana_ws.connect() + await solana_ws.subscribe() - if last_signature: - params[1]["before"] = last_signature + if first_subscription: + asyncio.create_task(self.list_initial_wallet_states()) + first_subscription = False - result = await solana_jsonrpc("getSignaturesForAddress", params) + await telegram_utils.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") - if result: - for signature in result: - if last_signature and signature['signature'] == last_signature: - break + receive_task = asyncio.create_task(solana_ws.receive_messages()) + process_task = asyncio.create_task(solana_ws.process_messages()) - # Process the transaction - await process_transaction(signature) + try: + await asyncio.gather(receive_task, process_task) + except asyncio.CancelledError: + pass + finally: + receive_task.cancel() + process_task.cancel() + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + finally: + await solana_ws.unsubscribe() + if solana_ws.websocket: + await solana_ws.websocket.close() + await telegram_utils.send_telegram_message("Reconnecting...") + await asyncio.sleep(5) + + async def process_transaction(signature): + # Implement your logic to process each transaction + print(f"Processing transaction: {signature['signature']}") + # You can add more processing logic here, such as storing in a database, + # triggering notifications, etc. + # Example usage + # async def main(): + # account_address = "Vote111111111111111111111111111111111111111" + + async def get_last_transactions(account_address, check_interval=300, limit=1000): + last_check_time = None + last_signature = None + + while True: + current_time = datetime.now() + + if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: + params = [ + account_address, + { + "limit": limit + } + ] + + if last_signature: + params[1]["before"] = last_signature + + result = await solana_jsonrpc("getSignaturesForAddress", params) if result: - last_signature = result[0]['signature'] + for signature in result: + if last_signature and signature['signature'] == last_signature: + break - last_check_time = current_time + # Process the transaction + await process_transaction(signature) + + if result: + last_signature = result[0]['signature'] + + last_check_time = current_time + + await asyncio.sleep(1) # Sleep for 1 second before checking again - await asyncio.sleep(1) # Sleep for 1 second before checking again -async def process_transaction(signature): - # Implement your logic to process each transaction - print(f"Processing transaction: {signature['signature']}") - # You can add more processing logic here, such as storing in a database, - # triggering notifications, etc. if __name__ == "__main__": asyncio.run(wallet_watch_loop()) \ No newline at end of file