better ws and mainnet subscriptions

This commit is contained in:
Dobromir Popov 2024-10-11 15:08:08 +03:00
parent b473d0921f
commit 91e3a18a4d

View File

@ -1045,36 +1045,46 @@ async def follow_move(move):
except Exception as e: except Exception as e:
logging.error(f"Error sending notification: {e}") logging.error(f"Error sending notification: {e}")
for retry in range(2): for retry in range(3):
private_key = Keypair.from_bytes(base58.b58decode(pk)) try:
async_client = AsyncClient(SOLANA_WS_URL) private_key = Keypair.from_bytes(base58.b58decode(pk))
jupiter = Jupiter(async_client, private_key) async_client = AsyncClient(SOLANA_WS_URL)
transaction_data = await jupiter.swap( jupiter = Jupiter(async_client, private_key)
input_mint=move['token_in'], transaction_data = await jupiter.swap(
output_mint=move['token_out'], input_mint=move['token_in'],
amount=amount, output_mint=move['token_out'],
slippage_bps=100, # Increased to 1% amount=amount,
) slippage_bps=100, # Increased to 1%
logging.info(f"Initiating move. Transaction data:\n {transaction_data}") )
error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}")
signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message))
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
# send the transaction # send the transaction
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
transaction_id = json.loads(result.to_json())['result'] transaction_id = json.loads(result.to_json())['result']
print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}") print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}")
await send_telegram_message(f"Follow Transaction Sent: {transaction_id}") await send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
tx_details = await get_transaction_details_with_retry(transaction_id) tx_details = await get_transaction_details_with_retry(transaction_id)
if tx_details is not None: if tx_details is not None:
break break
else: else:
logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...") logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...")
await asyncio.sleep(5) await asyncio.sleep(3)
except Exception as e:
error_message = f"<b>Move Failed:</b>\n{str(e)}"
logging.error(error_message)
# log the errors to /logs/errors.log
error_logger.error(error_message)
error_logger.exception(e)
await send_telegram_message(error_message)
amount = amount * 0.75
await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
try: try:
@ -1106,8 +1116,12 @@ async def follow_move(move):
logging.error(error_message) logging.error(error_message)
# log the errors to /logs/errors.log # log the errors to /logs/errors.log
error_logger.error(error_message) error_logger.error(error_message)
error_logger.exception(e) error_logger.exception(e) \
await send_telegram_message(error_message) # if error_message contains 'Program log: Error: insufficient funds'
if 'insufficient funds' in error_message:
await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
else:
await send_telegram_message(error_message)
# Helper functions # Helper functions
@ -1118,7 +1132,7 @@ SOLANA_ENDPOINTS = [
# "wss://mainnet.rpcpool.com", # "wss://mainnet.rpcpool.com",
] ]
PING_INTERVAL = 30 PING_INTERVAL = 30
SUBSCRIBE_INTERVAL = 180 # Resubscribe every 3 minutes SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes
async def heartbeat(websocket): async def heartbeat(websocket):
@ -1148,10 +1162,26 @@ async def subscribe_to_wallet():
while True: while True:
subscription_id = await subscribe(websocket) subscription_id = await subscribe(websocket)
if subscription_id: if subscription_id:
await process_messages(websocket, subscription_id) process_task = asyncio.create_task(process_messages(websocket, subscription_id))
while True:
await asyncio.sleep(SUBSCRIBE_INTERVAL) try:
await unsubscribe(websocket, subscription_id) await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL)
except asyncio.TimeoutError:
# Timeout occurred, time to resubscribe
process_task.cancel()
try:
await process_task
except asyncio.CancelledError:
pass
await unsubscribe(websocket, subscription_id)
subscription_id = await subscribe(websocket)
if subscription_id:
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
else:
break
else:
# process_messages completed (shouldn't happen unless there's an error)
break
heartbeat_task.cancel() heartbeat_task.cancel()
@ -1184,7 +1214,7 @@ async def subscribe(websocket):
if 'result' in response_data: if 'result' in response_data:
subscription_id = response_data['result'] subscription_id = response_data['result']
logger.info(f"Subscription successful. Subscription id: {subscription_id}") logger.info(f"Subscription successful. Subscription id: {subscription_id}")
await send_telegram_message("Connected to Solana network. Watching for transactions now.") await send_telegram_message("Solana mainnet connected... Listening for transactions.")
return subscription_id return subscription_id
else: else:
logger.warning(f"Unexpected response: {response_data}") logger.warning(f"Unexpected response: {response_data}")