diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 2ecccf8..6efbe72 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -653,7 +653,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False): account_data_info = account_data_data['parsed']['info'] if 'mint' in account_data_info: transfer['mint'] = account_data_info['mint'] - if 'decimals' not in TOKENS_INFO[transfer['mint']]: + if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: await get_token_metadata_symbol(transfer['mint']) # get actual prices current_price = await get_token_prices([transfer['mint']]) @@ -833,14 +833,17 @@ async def save_log(log): logging.error(f"Error saving RPC log: {e}") - +PROCESSING_LOG = False async def process_log(log_result): + global PROCESSING_LOG + if log_result['value']['err']: return logs = log_result['value']['logs'] try: # Detect swap operations in logs + PROCESSING_LOG = True swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn'] if any(op in logs for op in swap_operations): @@ -944,11 +947,13 @@ async def process_log(log_result): logging.error(f"Error aquiring log details and following: {e}") await send_telegram_message(f"Not followed! Error following move.") + except Exception as e: logging.error(f"Error processing log: {e}") + PROCESSING_LOG = False return tr_details # "Program log: Instruction: Swap2", # "Program log: order_id: 13985890735038016", @@ -1018,6 +1023,7 @@ async def follow_move(move): # Calculate the amount to swap based on the same percentage as the followed move amount_to_swap = your_balance * (move['percentage_swapped'] / 100) + amount_to_swap = min( min(amount_to_swap, your_balance), 300) # # always get 99% of the amount to swap # amount_to_swap = amount_to_swap * 0.95 @@ -1134,67 +1140,86 @@ SOLANA_ENDPOINTS = [ # "wss://mainnet.rpcpool.com", ] PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes +SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes -async def heartbeat(websocket): - while True: - try: - await websocket.ping() - await asyncio.sleep(PING_INTERVAL) - except websockets.exceptions.ConnectionClosed: - break +# async def heartbeat(websocket): +# while True: +# try: +# await websocket.ping() +# await asyncio.sleep(PING_INTERVAL) +# except websockets.exceptions.ConnectionClosed: +# break first_subscription = True -async def subscribe_to_wallet(): +async def wallet_watch_loop(): global first_subscription reconnect_delay = 5 max_reconnect_delay = 60 while True: try: - current_url = random.choice(SOLANA_ENDPOINTS) - async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: - logger.info(f"Connected to Solana websocket: {current_url}") - heartbeat_task = asyncio.create_task(heartbeat(websocket)) - if first_subscription: - asyncio.create_task( list_initial_wallet_states()) - first_subscription = False + try: + subscription_id = None + current_url = random.choice(SOLANA_ENDPOINTS) + async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: + logger.info(f"Connected to Solana websocket: {current_url}") + # heartbeat_task = asyncio.create_task(heartbeat(websocket)) - while True: - subscription_id = await subscribe(websocket) - if subscription_id: - await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") - process_task = asyncio.create_task(process_messages(websocket, subscription_id)) - while True: - try: - await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) - except asyncio.TimeoutError: - # Timeout occurred, time to resubscribe - process_task.cancel() + while True: + if websocket.closed: + break + + subscription_id = await subscribe(websocket) + if subscription_id is not None: + await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") + if first_subscription: + asyncio.create_task( list_initial_wallet_states()) + first_subscription = False + process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + while True: try: - await process_task - except asyncio.CancelledError: - pass - await unsubscribe(websocket, subscription_id) - subscription_id = await subscribe(websocket) - if subscription_id: - process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) + except asyncio.TimeoutError: + # Timeout occurred, time to resubscribe + if not PROCESSING_LOG: + process_task.cancel() + try: + await process_task + except asyncio.CancelledError: + pass + await unsubscribe(websocket, subscription_id) + new_sub_id = await subscribe(websocket) + if new_sub_id is None: break + if new_sub_id > 1: # we sometimes get True instead of integer, so we cje + subscription_id = new_sub_id + logger.info(f"New subscription created with ID: {subscription_id}") + elif new_sub_id is True: + # Already subscribed + logger.info("Already subscribed, continuing with existing subscription") + if subscription_id: + process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + else: + # process_messages completed (shouldn't happen unless there's an error) break - else: - # process_messages completed (shouldn't happen unless there's an error) - break + else: + send_telegram_message("Failed to connect. Retrying...") - heartbeat_task.cancel() + # heartbeat_task.cancel() - except websockets.exceptions.WebSocketException as e: - logger.error(f"WebSocket error: {e}") + except websockets.exceptions.WebSocketException as e: + logger.error(f"WebSocket error: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + + await unsubscribe(websocket, subscription_id) + await send_telegram_message("reconnecting...") + logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") + websocket.close() except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - await unsubscribe(websocket, subscription_id) - await send_telegram_message("reconnecting...") - logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") + logger.error(f"An unexpected error occurred - breaking watch loop: {e}") + await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay) @@ -1208,19 +1233,27 @@ async def subscribe(websocket): {"commitment": "confirmed"} ] } - - await websocket.send(json.dumps(request)) - logger.info("Subscription request sent") + try: + await websocket.send(json.dumps(request)) + logger.info("Subscription request sent") - response = await websocket.recv() - response_data = json.loads(response) - - if 'result' in response_data: - subscription_id = response_data['result'] - logger.info(f"Subscription successful. Subscription id: {subscription_id}") - return subscription_id - else: - logger.warning(f"Unexpected response: {response_data}") + response = await websocket.recv() + response_data = json.loads(response) + + if 'result' in response_data: + subscription_id = response_data['result'] + logger.info(f"Subscription successful. Subscription id: {subscription_id}") + return subscription_id + else: + logger.warning(f"Unexpected response: {response_data}") + return None + except websockets.exceptions.ConnectionClosedError as e: + logger.error(f"Connection closed unexpectedly: {e}") + await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") + await websocket.close() + return None + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") return None async def unsubscribe(websocket, subscription_id): @@ -1233,6 +1266,7 @@ async def unsubscribe(websocket, subscription_id): } await websocket.send(json.dumps(request)) logger.info(f"Unsubscribed from subscription id: {subscription_id}") + subscription_id = None async def process_messages(websocket, subscription_id): try: @@ -1285,7 +1319,7 @@ async def check_PK(): async def main(): await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() - await subscribe_to_wallet() + await wallet_watch_loop() async def run_flask(): loop = asyncio.get_running_loop()