Merge commit '292cf1b6ad4c93ee2d924de7ba037f1a7874149f'

This commit is contained in:
Dobromir Popov 2024-10-11 23:57:53 +03:00
commit 780aac0405

View File

@ -653,7 +653,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False):
account_data_info = account_data_data['parsed']['info'] account_data_info = account_data_data['parsed']['info']
if 'mint' in account_data_info: if 'mint' in account_data_info:
transfer['mint'] = account_data_info['mint'] transfer['mint'] = account_data_info['mint']
if 'decimals' not in TOKENS_INFO[transfer['mint']]: if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
await get_token_metadata_symbol(transfer['mint']) await get_token_metadata_symbol(transfer['mint'])
# get actual prices # get actual prices
current_price = await get_token_prices([transfer['mint']]) current_price = await get_token_prices([transfer['mint']])
@ -833,14 +833,17 @@ async def save_log(log):
logging.error(f"Error saving RPC log: {e}") logging.error(f"Error saving RPC log: {e}")
PROCESSING_LOG = False
async def process_log(log_result): async def process_log(log_result):
global PROCESSING_LOG
if log_result['value']['err']: if log_result['value']['err']:
return return
logs = log_result['value']['logs'] logs = log_result['value']['logs']
try: try:
# Detect swap operations in logs # Detect swap operations in logs
PROCESSING_LOG = True
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn'] swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn']
if any(op in logs for op in swap_operations): if any(op in logs for op in swap_operations):
@ -945,10 +948,12 @@ async def process_log(log_result):
await send_telegram_message(f"Not followed! Error following move.") await send_telegram_message(f"Not followed! Error following move.")
except Exception as e: except Exception as e:
logging.error(f"Error processing log: {e}") logging.error(f"Error processing log: {e}")
PROCESSING_LOG = False
return tr_details return tr_details
# "Program log: Instruction: Swap2", # "Program log: Instruction: Swap2",
# "Program log: order_id: 13985890735038016", # "Program log: order_id: 13985890735038016",
@ -1018,6 +1023,7 @@ async def follow_move(move):
# Calculate the amount to swap based on the same percentage as the followed move # Calculate the amount to swap based on the same percentage as the followed move
amount_to_swap = your_balance * (move['percentage_swapped'] / 100) amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
amount_to_swap = min( min(amount_to_swap, your_balance), 300)
# # always get 99% of the amount to swap # # always get 99% of the amount to swap
# amount_to_swap = amount_to_swap * 0.95 # amount_to_swap = amount_to_swap * 0.95
@ -1134,67 +1140,86 @@ SOLANA_ENDPOINTS = [
# "wss://mainnet.rpcpool.com", # "wss://mainnet.rpcpool.com",
] ]
PING_INTERVAL = 30 PING_INTERVAL = 30
SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
async def heartbeat(websocket): # async def heartbeat(websocket):
while True: # while True:
try: # try:
await websocket.ping() # await websocket.ping()
await asyncio.sleep(PING_INTERVAL) # await asyncio.sleep(PING_INTERVAL)
except websockets.exceptions.ConnectionClosed: # except websockets.exceptions.ConnectionClosed:
break # break
first_subscription = True first_subscription = True
async def subscribe_to_wallet(): async def wallet_watch_loop():
global first_subscription global first_subscription
reconnect_delay = 5 reconnect_delay = 5
max_reconnect_delay = 60 max_reconnect_delay = 60
while True: while True:
try: try:
current_url = random.choice(SOLANA_ENDPOINTS) try:
async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: subscription_id = None
logger.info(f"Connected to Solana websocket: {current_url}") current_url = random.choice(SOLANA_ENDPOINTS)
heartbeat_task = asyncio.create_task(heartbeat(websocket)) async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket:
if first_subscription: logger.info(f"Connected to Solana websocket: {current_url}")
asyncio.create_task( list_initial_wallet_states()) # heartbeat_task = asyncio.create_task(heartbeat(websocket))
first_subscription = False
while True: while True:
subscription_id = await subscribe(websocket) if websocket.closed:
if subscription_id: break
await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
process_task = asyncio.create_task(process_messages(websocket, subscription_id)) subscription_id = await subscribe(websocket)
while True: if subscription_id is not None:
try: await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL) if first_subscription:
except asyncio.TimeoutError: asyncio.create_task( list_initial_wallet_states())
# Timeout occurred, time to resubscribe first_subscription = False
process_task.cancel() process_task = asyncio.create_task(process_messages(websocket, subscription_id))
while True:
try: try:
await process_task await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL)
except asyncio.CancelledError: except asyncio.TimeoutError:
pass # Timeout occurred, time to resubscribe
await unsubscribe(websocket, subscription_id) if not PROCESSING_LOG:
subscription_id = await subscribe(websocket) process_task.cancel()
if subscription_id: try:
process_task = asyncio.create_task(process_messages(websocket, subscription_id)) await process_task
except asyncio.CancelledError:
pass
await unsubscribe(websocket, subscription_id)
new_sub_id = await subscribe(websocket)
if new_sub_id is None: break
if new_sub_id > 1: # we sometimes get True instead of integer, so we cje
subscription_id = new_sub_id
logger.info(f"New subscription created with ID: {subscription_id}")
elif new_sub_id is True:
# Already subscribed
logger.info("Already subscribed, continuing with existing subscription")
if subscription_id:
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
else: else:
# process_messages completed (shouldn't happen unless there's an error)
break break
else: else:
# process_messages completed (shouldn't happen unless there's an error) send_telegram_message("Failed to connect. Retrying...")
break
heartbeat_task.cancel() # heartbeat_task.cancel()
except websockets.exceptions.WebSocketException as e: except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error: {e}") logger.error(f"WebSocket error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
await unsubscribe(websocket, subscription_id)
await send_telegram_message("reconnecting...")
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
websocket.close()
except Exception as e: except Exception as e:
logger.error(f"An unexpected error occurred: {e}") logger.error(f"An unexpected error occurred - breaking watch loop: {e}")
await unsubscribe(websocket, subscription_id)
await send_telegram_message("reconnecting...")
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
await asyncio.sleep(reconnect_delay) await asyncio.sleep(reconnect_delay)
reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay) reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay)
@ -1208,19 +1233,27 @@ async def subscribe(websocket):
{"commitment": "confirmed"} {"commitment": "confirmed"}
] ]
} }
try:
await websocket.send(json.dumps(request))
logger.info("Subscription request sent")
await websocket.send(json.dumps(request)) response = await websocket.recv()
logger.info("Subscription request sent") response_data = json.loads(response)
response = await websocket.recv() if 'result' in response_data:
response_data = json.loads(response) subscription_id = response_data['result']
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
if 'result' in response_data: return subscription_id
subscription_id = response_data['result'] else:
logger.info(f"Subscription successful. Subscription id: {subscription_id}") logger.warning(f"Unexpected response: {response_data}")
return subscription_id return None
else: except websockets.exceptions.ConnectionClosedError as e:
logger.warning(f"Unexpected response: {response_data}") logger.error(f"Connection closed unexpectedly: {e}")
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
await websocket.close()
return None
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return None return None
async def unsubscribe(websocket, subscription_id): async def unsubscribe(websocket, subscription_id):
@ -1233,6 +1266,7 @@ async def unsubscribe(websocket, subscription_id):
} }
await websocket.send(json.dumps(request)) await websocket.send(json.dumps(request))
logger.info(f"Unsubscribed from subscription id: {subscription_id}") logger.info(f"Unsubscribed from subscription id: {subscription_id}")
subscription_id = None
async def process_messages(websocket, subscription_id): async def process_messages(websocket, subscription_id):
try: try:
@ -1285,7 +1319,7 @@ async def check_PK():
async def main(): async def main():
await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await check_PK() await check_PK()
await subscribe_to_wallet() await wallet_watch_loop()
async def run_flask(): async def run_flask():
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()