diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 8bd5efe..8867c9f 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1,4 +1,6 @@ -import asyncio +import concurrent.futures +import threading +import queue import uvicorn import os from dotenv import load_dotenv @@ -11,15 +13,14 @@ from modules.utils import telegram_utils, logging, get_pk from modules.log_processor import watch_for_new_logs from modules.SolanaAPI import SAPI from config import DO_WATCH_WALLET - from asgiref.wsgi import WsgiToAsgi from multiprocessing import Process - +import time load_dotenv() load_dotenv('.env.secret') -async def save_log(log): +def save_log(log): try: os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") @@ -30,8 +31,9 @@ async def save_log(log): logging.error(f"Error saving RPC log: {e}") PROCESSING_LOG = False +log_queue = queue.Queue() -async def process_log(log_result): +def process_log(log_result): global PROCESSING_LOG tr_details = { "order_id": None, @@ -50,10 +52,11 @@ async def process_log(log_result): logs = log_result['value']['logs'] try: PROCESSING_LOG = True - swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2'] + swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', + 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2'] if any(op in logs for op in swap_operations): - await save_log(log_result) + save_log(log_result) tx_signature_str = log_result['value']['signature'] before_source_balance = 0 @@ -75,9 +78,7 @@ async def process_log(log_result): tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 elif "destination_token_change" in part: tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 - - i += 1 - + if "before_source_balance" in log_entry: parts = log_entry.split(", ") for part in parts: @@ -88,11 +89,15 @@ async def process_log(log_result): for part in parts: if "source_token_change" in part: source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6 + i += 1 try: - if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: + if tr_details["token_in"] is None or tr_details["token_out"] is None or \ + tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") - tr_details = await SAPI.get_transaction_details_info(tx_signature_str, logs) + tr_details = SAPI.get_transaction_details_info(tx_signature_str, logs) + + if before_source_balance > 0 and source_token_change > 0: tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100 if tr_details["percentage_swapped"] > 100: @@ -109,19 +114,19 @@ async def process_log(log_result): except Exception as e: logging.error(f"Error fetching token prices: {e}") - + message_text = ( f"Swap detected: \n" - f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " - f"{tr_details['symbol_out']} \n" + f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} " + f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']} \n" ) - await telegram_utils.send_telegram_message(message_text) - await SAPI.follow_move(tr_details) - await SAPI.save_token_info() + telegram_utils.send_telegram_message(message_text) + SAPI.follow_move(tr_details) + SAPI.save_token_info() except Exception as e: - logging.error(f"Error aquiring log details and following: {e}") - await telegram_utils.send_telegram_message(f"Not followed! Error following move.") + logging.error(f"Error acquiring log details and following: {e}") + telegram_utils.send_telegram_message(f"Not followed! Error following move.") except Exception as e: logging.error(f"Error processing log: {e}") @@ -129,63 +134,69 @@ async def process_log(log_result): PROCESSING_LOG = False return tr_details -async def process_messages(websocket): - try: +# def process_messages(websocket): +# try: +# while True: +# response = websocket.recv() +# response_data = json.loads(response) +# logger.debug(f"Received response: {response_data}") + +# if 'result' in response_data: +# new_sub_id = response_data['result'] +# if int(new_sub_id) > 1: +# subscription_id = new_sub_id +# logger.info(f"Subscription successful. New id: {subscription_id}") +# elif new_sub_id: +# logger.info(f"Existing subscription confirmed: {subscription_id}") +# else: +# return None +# return subscription_id +# elif 'params' in response_data: +# log = response_data['params']['result'] +# logger.debug(f"Received transaction log: {log}") +# log_queue.put(log) +# else: +# logger.warning(f"Unexpected response: {response_data}") + +# except Exception as e: +# logger.error(f"An error occurred: {e}") + +def log_processor_worker(): + with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: while True: - response = await websocket.recv() - response_data = json.loads(response) - logger.debug(f"Received response: {response_data}") - - if 'result' in response_data: - new_sub_id = response_data['result'] - if int(new_sub_id) > 1: - subscription_id = new_sub_id - logger.info(f"Subscription successful. New id: {subscription_id}") - elif new_sub_id: - logger.info(f"Existing subscription confirmed: {subscription_id}") - else: return None - return subscription_id - elif 'params' in response_data: - log = response_data['params']['result'] - logger.debug(f"Received transaction log: {log}") - asyncio.create_task(process_log(log)) - else: - logger.warning(f"Unexpected response: {response_data}") - - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - pass - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - + try: + log = log_queue.get() + executor.submit(process_log, log) + except Exception as e: + logger.error(f"Error in log processor worker: {e}") + finally: + log_queue.task_done() logger = logging.getLogger(__name__) app = init_app() asgi_app = WsgiToAsgi(app) -async def restartable_task(task_func, *args, **kwargs): +def run_with_retry(task_func, *args, **kwargs): while True: try: - await task_func(*args, **kwargs) + task_func(*args, **kwargs) except Exception as e: error_msg = f"Error in task {task_func.__name__}: {e}" logger.error(error_msg) - await telegram_utils.send_telegram_message(error_msg) - await asyncio.sleep(5) + telegram_utils.send_telegram_message(error_msg) + time.sleep(5) -async def init_bot(): +def init_bot(): # Initialize bot components - pk = await get_pk() - await telegram_utils.initialize() - await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") + # pk = get_pk() + telegram_utils.initialize() + telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") # Start monitoring tasks - asyncio.create_task(restartable_task(watch_for_new_logs)) + threading.Thread(target=run_with_retry, args=(watch_for_new_logs,), daemon=True).start() if DO_WATCH_WALLET: - asyncio.create_task(restartable_task(SAPI.wallet_watch_loop)) + threading.Thread(target=run_with_retry, args=(SAPI.wallet_watch_loop,), daemon=True).start() def run_server(): uvicorn.run( @@ -196,19 +207,22 @@ def run_server(): reload=True ) -async def main(): - # Start bot initialization in background - bot_task = asyncio.create_task(init_bot()) +def main(): + # Start log processor worker + log_processor_thread = threading.Thread(target=log_processor_worker, daemon=True) + log_processor_thread.start() + + # Initialize bot + init_bot() # Start server in a separate process server_process = Process(target=run_server) server_process.start() try: - # Wait for bot initialization - await bot_task # Keep main process running - await asyncio.Event().wait() + while True: + time.sleep(1) except KeyboardInterrupt: logger.info("Shutting down...") finally: @@ -216,4 +230,4 @@ async def main(): server_process.join() if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + main() \ No newline at end of file diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index b383a68..015ef22 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -770,7 +770,9 @@ class SolanaAPI: logging.info(f"Initiating move. Transaction data:\n {transaction_data}") raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) - priority_fee = await async_client.get_fee_for_message(raw_transaction.message) + fee = await async_client.get_fee_for_message(raw_transaction.message) + priority_fee = fee.value or 100_000 + # fee = await async_client.get_fee_for_message(transaction_data) # priority_fee = 0 @@ -785,16 +787,70 @@ class SolanaAPI: # message = raw_transaction.message # Add compute budget instruction to set priority fee - from solders.compute_budget import set_compute_unit_price - compute_budget_instruction = set_compute_unit_price(priority_fee) + # from solders.compute_budget import set_compute_unit_price + # compute_budget_instruction = set_compute_unit_price(priority_fee) # Add compute budget instruction to the transaction - raw_transaction.message.add_instruction(compute_budget_instruction) + # raw_transaction.message.add_instruction(compute_budget_instruction) + + # Create new instructions list with compute budget instruction first + # new_instructions = [compute_budget_instruction] + list(raw_transaction.message.instructions) + + # # Create a new message with the updated instructions + # from solders.message import MessageV0 + # new_message = MessageV0( + # instructions=new_instructions, + # address_table_lookups=raw_transaction.message.address_table_lookups, + # recent_blockhash=raw_transaction.message.recent_blockhash, + # payer=raw_transaction.message.payer + # ) - - # signature = private_key.sign_message( bytes(message) ) - signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) - signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) + # working - no priority fee + # signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) + # signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) + # new - not working + # signature = private_key.sign_message(new_message.to_bytes_versioned()) + # signed_txn = VersionedTransaction.populate(new_message, [signature]) + from solders.compute_budget import set_compute_unit_price, ID as COMPUTE_BUDGET_ID + + priority_fee_ix = set_compute_unit_price(priority_fee) + + # Get the current message + msg = raw_transaction.message + + new_account_keys = msg.account_keys + program_id_index = 0 + if COMPUTE_BUDGET_ID not in msg.account_keys: + new_account_keys = msg.account_keys + [COMPUTE_BUDGET_ID] + program_id_index = len(msg.account_keys) # Index of the newly added program ID + else: + new_account_keys = msg.account_keys + program_id_index = msg.account_keys.index(COMPUTE_BUDGET_ID) + + # Compile the priority fee instruction + compiled_priority_fee_ix = CompiledInstruction( + program_id_index=program_id_index, + accounts=bytes([]), + data=priority_fee_ix.data + ) + + # Add priority fee instruction at the beginning + new_instructions = [compiled_priority_fee_ix] + msg.instructions + + # Create new message with updated instructions + new_message = Message.new_with_compiled_instructions( + num_required_signatures=msg.header.num_required_signatures, + num_readonly_signed_accounts=msg.header.num_readonly_signed_accounts, + num_readonly_unsigned_accounts=msg.header.num_readonly_unsigned_accounts, + account_keys=new_account_keys, + recent_blockhash=msg.recent_blockhash, + instructions=new_instructions + ) + + #signature = private_key.sign_message(new_message.to_bytes_versioned()) + signature = private_key.sign_message(message.to_bytes_versioned(new_message)) + signed_txn = VersionedTransaction.populate(new_message, [signature]) + opts = TxOpts( skip_preflight=False, preflight_commitment=Processed, diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index 00ad411..6dce7e2 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -308,7 +308,7 @@ def init_app(tr_handler=None): else: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction - await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) + # await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) except Exception as e: # Store the failed copytrade transaction # await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)