From 91e3a18a4d752030b3dff0d32c1ae8afc20b98ec Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 11 Oct 2024 15:08:08 +0300 Subject: [PATCH] better ws and mainnet subscriptions --- crypto/sol/app.py | 102 ++++++++++++++++++++++++++++++---------------- 1 file changed, 66 insertions(+), 36 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 2c6c1fa..161fbca 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1045,36 +1045,46 @@ async def follow_move(move): except Exception as e: logging.error(f"Error sending notification: {e}") - for retry in range(2): - private_key = Keypair.from_bytes(base58.b58decode(pk)) - async_client = AsyncClient(SOLANA_WS_URL) - jupiter = Jupiter(async_client, private_key) - transaction_data = await jupiter.swap( - input_mint=move['token_in'], - output_mint=move['token_out'], - amount=amount, - slippage_bps=100, # Increased to 1% - ) - logging.info(f"Initiating move. Transaction data:\n {transaction_data}") - error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") - raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) - signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) - signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) - opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) + for retry in range(3): + try: + private_key = Keypair.from_bytes(base58.b58decode(pk)) + async_client = AsyncClient(SOLANA_WS_URL) + jupiter = Jupiter(async_client, private_key) + transaction_data = await jupiter.swap( + input_mint=move['token_in'], + output_mint=move['token_out'], + amount=amount, + slippage_bps=100, # Increased to 1% + ) + logging.info(f"Initiating move. Transaction data:\n {transaction_data}") + error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") + raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) + signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) + signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) + opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) - # send the transaction - result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) + # send the transaction + result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) - transaction_id = json.loads(result.to_json())['result'] - print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}") - await send_telegram_message(f"Follow Transaction Sent: {transaction_id}") - tx_details = await get_transaction_details_with_retry(transaction_id) - - if tx_details is not None: - break - else: - logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...") - await asyncio.sleep(5) + transaction_id = json.loads(result.to_json())['result'] + print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}") + await send_telegram_message(f"Follow Transaction Sent: {transaction_id}") + tx_details = await get_transaction_details_with_retry(transaction_id) + + if tx_details is not None: + break + else: + logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...") + await asyncio.sleep(3) + except Exception as e: + error_message = f"Move Failed:\n{str(e)}" + logging.error(error_message) + # log the errors to /logs/errors.log + error_logger.error(error_message) + error_logger.exception(e) + await send_telegram_message(error_message) + amount = amount * 0.75 + await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) try: @@ -1106,8 +1116,12 @@ async def follow_move(move): logging.error(error_message) # log the errors to /logs/errors.log error_logger.error(error_message) - error_logger.exception(e) - await send_telegram_message(error_message) + error_logger.exception(e) \ + # if error_message contains 'Program log: Error: insufficient funds' + if 'insufficient funds' in error_message: + await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") + else: + await send_telegram_message(error_message) # Helper functions @@ -1118,7 +1132,7 @@ SOLANA_ENDPOINTS = [ # "wss://mainnet.rpcpool.com", ] PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 180 # Resubscribe every 3 minutes +SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes async def heartbeat(websocket): @@ -1148,10 +1162,26 @@ async def subscribe_to_wallet(): while True: subscription_id = await subscribe(websocket) if subscription_id: - await process_messages(websocket, subscription_id) - - await asyncio.sleep(SUBSCRIBE_INTERVAL) - await unsubscribe(websocket, subscription_id) + process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + while True: + try: + await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) + except asyncio.TimeoutError: + # Timeout occurred, time to resubscribe + process_task.cancel() + try: + await process_task + except asyncio.CancelledError: + pass + await unsubscribe(websocket, subscription_id) + subscription_id = await subscribe(websocket) + if subscription_id: + process_task = asyncio.create_task(process_messages(websocket, subscription_id)) + else: + break + else: + # process_messages completed (shouldn't happen unless there's an error) + break heartbeat_task.cancel() @@ -1184,7 +1214,7 @@ async def subscribe(websocket): if 'result' in response_data: subscription_id = response_data['result'] logger.info(f"Subscription successful. Subscription id: {subscription_id}") - await send_telegram_message("Connected to Solana network. Watching for transactions now.") + await send_telegram_message("Solana mainnet connected... Listening for transactions.") return subscription_id else: logger.warning(f"Unexpected response: {response_data}")