more stability improvements
This commit is contained in:
parent
8a371e4c8c
commit
292cf1b6ad
@ -653,7 +653,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False):
|
|||||||
account_data_info = account_data_data['parsed']['info']
|
account_data_info = account_data_data['parsed']['info']
|
||||||
if 'mint' in account_data_info:
|
if 'mint' in account_data_info:
|
||||||
transfer['mint'] = account_data_info['mint']
|
transfer['mint'] = account_data_info['mint']
|
||||||
if 'decimals' not in TOKENS_INFO[transfer['mint']]:
|
if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
|
||||||
await get_token_metadata_symbol(transfer['mint'])
|
await get_token_metadata_symbol(transfer['mint'])
|
||||||
# get actual prices
|
# get actual prices
|
||||||
current_price = await get_token_prices([transfer['mint']])
|
current_price = await get_token_prices([transfer['mint']])
|
||||||
@ -833,14 +833,17 @@ async def save_log(log):
|
|||||||
logging.error(f"Error saving RPC log: {e}")
|
logging.error(f"Error saving RPC log: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
PROCESSING_LOG = False
|
||||||
async def process_log(log_result):
|
async def process_log(log_result):
|
||||||
|
global PROCESSING_LOG
|
||||||
|
|
||||||
if log_result['value']['err']:
|
if log_result['value']['err']:
|
||||||
return
|
return
|
||||||
|
|
||||||
logs = log_result['value']['logs']
|
logs = log_result['value']['logs']
|
||||||
try:
|
try:
|
||||||
# Detect swap operations in logs
|
# Detect swap operations in logs
|
||||||
|
PROCESSING_LOG = True
|
||||||
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn']
|
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn']
|
||||||
|
|
||||||
if any(op in logs for op in swap_operations):
|
if any(op in logs for op in swap_operations):
|
||||||
@ -944,11 +947,13 @@ async def process_log(log_result):
|
|||||||
logging.error(f"Error aquiring log details and following: {e}")
|
logging.error(f"Error aquiring log details and following: {e}")
|
||||||
await send_telegram_message(f"Not followed! Error following move.")
|
await send_telegram_message(f"Not followed! Error following move.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error processing log: {e}")
|
logging.error(f"Error processing log: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
PROCESSING_LOG = False
|
||||||
return tr_details
|
return tr_details
|
||||||
# "Program log: Instruction: Swap2",
|
# "Program log: Instruction: Swap2",
|
||||||
# "Program log: order_id: 13985890735038016",
|
# "Program log: order_id: 13985890735038016",
|
||||||
@ -1018,6 +1023,7 @@ async def follow_move(move):
|
|||||||
# Calculate the amount to swap based on the same percentage as the followed move
|
# Calculate the amount to swap based on the same percentage as the followed move
|
||||||
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
|
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
|
||||||
|
|
||||||
|
amount_to_swap = min( min(amount_to_swap, your_balance), 300)
|
||||||
# # always get 99% of the amount to swap
|
# # always get 99% of the amount to swap
|
||||||
# amount_to_swap = amount_to_swap * 0.95
|
# amount_to_swap = amount_to_swap * 0.95
|
||||||
|
|
||||||
@ -1134,67 +1140,86 @@ SOLANA_ENDPOINTS = [
|
|||||||
# "wss://mainnet.rpcpool.com",
|
# "wss://mainnet.rpcpool.com",
|
||||||
]
|
]
|
||||||
PING_INTERVAL = 30
|
PING_INTERVAL = 30
|
||||||
SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes
|
SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
|
||||||
|
|
||||||
|
|
||||||
async def heartbeat(websocket):
|
# async def heartbeat(websocket):
|
||||||
while True:
|
# while True:
|
||||||
try:
|
# try:
|
||||||
await websocket.ping()
|
# await websocket.ping()
|
||||||
await asyncio.sleep(PING_INTERVAL)
|
# await asyncio.sleep(PING_INTERVAL)
|
||||||
except websockets.exceptions.ConnectionClosed:
|
# except websockets.exceptions.ConnectionClosed:
|
||||||
break
|
# break
|
||||||
|
|
||||||
first_subscription = True
|
first_subscription = True
|
||||||
async def subscribe_to_wallet():
|
async def wallet_watch_loop():
|
||||||
global first_subscription
|
global first_subscription
|
||||||
reconnect_delay = 5
|
reconnect_delay = 5
|
||||||
max_reconnect_delay = 60
|
max_reconnect_delay = 60
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
current_url = random.choice(SOLANA_ENDPOINTS)
|
try:
|
||||||
async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket:
|
subscription_id = None
|
||||||
logger.info(f"Connected to Solana websocket: {current_url}")
|
current_url = random.choice(SOLANA_ENDPOINTS)
|
||||||
heartbeat_task = asyncio.create_task(heartbeat(websocket))
|
async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket:
|
||||||
if first_subscription:
|
logger.info(f"Connected to Solana websocket: {current_url}")
|
||||||
asyncio.create_task( list_initial_wallet_states())
|
# heartbeat_task = asyncio.create_task(heartbeat(websocket))
|
||||||
first_subscription = False
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
subscription_id = await subscribe(websocket)
|
if websocket.closed:
|
||||||
if subscription_id:
|
break
|
||||||
await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
|
|
||||||
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
|
subscription_id = await subscribe(websocket)
|
||||||
while True:
|
if subscription_id is not None:
|
||||||
try:
|
await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
|
||||||
await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL)
|
if first_subscription:
|
||||||
except asyncio.TimeoutError:
|
asyncio.create_task( list_initial_wallet_states())
|
||||||
# Timeout occurred, time to resubscribe
|
first_subscription = False
|
||||||
process_task.cancel()
|
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
|
||||||
|
while True:
|
||||||
try:
|
try:
|
||||||
await process_task
|
await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL)
|
||||||
except asyncio.CancelledError:
|
except asyncio.TimeoutError:
|
||||||
pass
|
# Timeout occurred, time to resubscribe
|
||||||
await unsubscribe(websocket, subscription_id)
|
if not PROCESSING_LOG:
|
||||||
subscription_id = await subscribe(websocket)
|
process_task.cancel()
|
||||||
if subscription_id:
|
try:
|
||||||
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
|
await process_task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
await unsubscribe(websocket, subscription_id)
|
||||||
|
new_sub_id = await subscribe(websocket)
|
||||||
|
if new_sub_id is None: break
|
||||||
|
if new_sub_id > 1: # we sometimes get True instead of integer, so we cje
|
||||||
|
subscription_id = new_sub_id
|
||||||
|
logger.info(f"New subscription created with ID: {subscription_id}")
|
||||||
|
elif new_sub_id is True:
|
||||||
|
# Already subscribed
|
||||||
|
logger.info("Already subscribed, continuing with existing subscription")
|
||||||
|
if subscription_id:
|
||||||
|
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
# process_messages completed (shouldn't happen unless there's an error)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
# process_messages completed (shouldn't happen unless there's an error)
|
send_telegram_message("Failed to connect. Retrying...")
|
||||||
break
|
|
||||||
|
|
||||||
heartbeat_task.cancel()
|
# heartbeat_task.cancel()
|
||||||
|
|
||||||
except websockets.exceptions.WebSocketException as e:
|
except websockets.exceptions.WebSocketException as e:
|
||||||
logger.error(f"WebSocket error: {e}")
|
logger.error(f"WebSocket error: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An unexpected error occurred: {e}")
|
||||||
|
|
||||||
|
await unsubscribe(websocket, subscription_id)
|
||||||
|
await send_telegram_message("reconnecting...")
|
||||||
|
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
|
||||||
|
websocket.close()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"An unexpected error occurred: {e}")
|
logger.error(f"An unexpected error occurred - breaking watch loop: {e}")
|
||||||
await unsubscribe(websocket, subscription_id)
|
|
||||||
await send_telegram_message("reconnecting...")
|
|
||||||
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
|
|
||||||
await asyncio.sleep(reconnect_delay)
|
await asyncio.sleep(reconnect_delay)
|
||||||
reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay)
|
reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay)
|
||||||
|
|
||||||
@ -1208,19 +1233,27 @@ async def subscribe(websocket):
|
|||||||
{"commitment": "confirmed"}
|
{"commitment": "confirmed"}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
try:
|
||||||
await websocket.send(json.dumps(request))
|
await websocket.send(json.dumps(request))
|
||||||
logger.info("Subscription request sent")
|
logger.info("Subscription request sent")
|
||||||
|
|
||||||
response = await websocket.recv()
|
response = await websocket.recv()
|
||||||
response_data = json.loads(response)
|
response_data = json.loads(response)
|
||||||
|
|
||||||
if 'result' in response_data:
|
if 'result' in response_data:
|
||||||
subscription_id = response_data['result']
|
subscription_id = response_data['result']
|
||||||
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
|
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
|
||||||
return subscription_id
|
return subscription_id
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Unexpected response: {response_data}")
|
logger.warning(f"Unexpected response: {response_data}")
|
||||||
|
return None
|
||||||
|
except websockets.exceptions.ConnectionClosedError as e:
|
||||||
|
logger.error(f"Connection closed unexpectedly: {e}")
|
||||||
|
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
|
||||||
|
await websocket.close()
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An unexpected error occurred: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
async def unsubscribe(websocket, subscription_id):
|
async def unsubscribe(websocket, subscription_id):
|
||||||
@ -1233,6 +1266,7 @@ async def unsubscribe(websocket, subscription_id):
|
|||||||
}
|
}
|
||||||
await websocket.send(json.dumps(request))
|
await websocket.send(json.dumps(request))
|
||||||
logger.info(f"Unsubscribed from subscription id: {subscription_id}")
|
logger.info(f"Unsubscribed from subscription id: {subscription_id}")
|
||||||
|
subscription_id = None
|
||||||
|
|
||||||
async def process_messages(websocket, subscription_id):
|
async def process_messages(websocket, subscription_id):
|
||||||
try:
|
try:
|
||||||
@ -1285,7 +1319,7 @@ async def check_PK():
|
|||||||
async def main():
|
async def main():
|
||||||
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
||||||
await check_PK()
|
await check_PK()
|
||||||
await subscribe_to_wallet()
|
await wallet_watch_loop()
|
||||||
|
|
||||||
async def run_flask():
|
async def run_flask():
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user