From 86b3a086b90e5832f032d2b52f90f28223684779 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 22 Oct 2024 02:44:18 +0300 Subject: [PATCH] refactoring --- crypto/sol/app.py | 65 ++++++++++++++++++--------------- crypto/sol/modules/SolanaAPI.py | 47 ++++++++++++++++-------- 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 867e270..587da77 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -16,18 +16,18 @@ import requests import re import random from threading import Thread -from solana.keypair import Keypair from solana.rpc.async_api import AsyncClient -from solana.transaction import VersionedTransaction, TxOpts -from solana.rpc.types import Processed -from jupiter import Jupiter - -app = Flask(__name__) +from solders.transaction import VersionedTransaction +from solana.rpc.types import TxOpts +from solana.rpc.commitment import Confirmed, Finalized, Processed +from solders.keypair import Keypair +from jupiter_python_sdk.jupiter import Jupiter from modules.webui import init_app from modules.storage import init_db, store_transaction -from modules.utils import telegram_utils, logging, get_pk, send_telegram_message -from modules.SolanaAPI import SAPI, SolanaAPI, get_wallet_balances, get_transaction_details_with_retry, save_token_info +from modules.utils import telegram_utils, logging, get_pk +from modules.SolanaAPI import SAPI + # config = load_config() load_dotenv() @@ -199,11 +199,11 @@ async def process_log(log_result): ) await telegram_utils.send_telegram_message(message_text) await follow_move(tr_details) - await save_token_info() + await SAPI.save_token_info() except Exception as e: logging.error(f"Error aquiring log details and following: {e}") - await send_telegram_message(f"Not followed! Error following move.") + await telegram_utils.send_telegram_message(f"Not followed! Error following move.") @@ -244,21 +244,21 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: async def follow_move(move): - your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) + your_balances = await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) if your_balance_info is not None: # Use the balance print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") else: print(f"No ballance found for {move['symbol_in']}. Skipping move.") - await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") + await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") return your_balance = your_balance_info['amount'] token_info = TOKENS_INFO.get(move['token_in']) - token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in']) + token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata(move['token_in']) token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out']) if not your_balance: @@ -279,7 +279,7 @@ async def follow_move(move): except ValueError: msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number." logging.warning(msg) - await send_telegram_message(msg) + await telegram_utils.send_telegram_message(msg) return amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have @@ -325,7 +325,8 @@ async def follow_move(move): logging.info(f"Initiating move. Transaction data:\n {transaction_data}") error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) - signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) + message = raw_transaction.message + signature = private_key.sign_message(message.to_bytes_versioned()) signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) @@ -338,7 +339,7 @@ async def follow_move(move): notification += f"\n\nTransaction: {transaction_id}" await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}") - tx_details = await get_transaction_details_with_retry(transaction_id) + tx_details = await SAPI.get_transaction_details_with_retry(transaction_id) if tx_details is not None: break @@ -354,7 +355,7 @@ async def follow_move(move): await telegram_utils.send_telegram_message(error_message) amount = amount * 0.75 - await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) + await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) try: if tx_details is None: @@ -377,7 +378,7 @@ async def follow_move(move): f"\n\nTransaction: {transaction_id}" ) logging.info(notification) - await send_telegram_message(notification) + await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") @@ -430,34 +431,38 @@ async def process_messages(websocket): logger.error(f"An unexpected error occurred: {e}") -pk = get_pk() +pk = None # Convert Flask app to ASGI -asgi_app = WsgiToAsgi(app) +asgi_app = WsgiToAsgi(init_app) async def main(): - global solanaAPI, bot, PROCESSING_LOG + global solanaAPI, bot, PROCESSING_LOG, pk + pk = await get_pk() await telegram_utils.initialize() await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") # process_transaction await SAPI.wallet_watch_loop() - -async def run_all(): - await main() +def run_asyncio_tasks(): + asyncio.run(main()) if __name__ == '__main__': - try: - asyncio.run(run_all()) - except Exception as e: - logging.error(f"An error occurred: {e}") + import multiprocessing - flask_app = init_app() + # Start the asyncio tasks in a separate process + process = multiprocessing.Process(target=run_asyncio_tasks) + process.start() + + # Run the ASGI server uvicorn.run( - flask_app, + "app:asgi_app", host="127.0.0.1", port=3001, log_level="debug", reload=True ) + + # Wait for the asyncio tasks to complete + process.join() diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 1328d71..7b22ca4 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -74,7 +74,8 @@ class SolanaWS: self.message_queue = asyncio.Queue() self.on_message = on_message self.websocket = None - + self.last_msg_responded = False + async def connect(self): while True: try: @@ -85,8 +86,10 @@ class SolanaWS: except Exception as e: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) + - async def ws_jsonrpc(self, ws, method, params=None, doProcessResponse = True): + async def ws_jsonrpc(self, method, params=None, doProcessResponse = True): + if not isinstance(params, list): params = [params] if params is not None else [] @@ -96,14 +99,15 @@ class SolanaWS: "method": method, "params": params } - - await ws.send(json.dumps(request)) + self.last_msg_responded = False + await self.websocket.send(json.dumps(request)) if not doProcessResponse: return None else: response = await self.websocket.recv() response_data = json.loads(response) - + self.last_msg_responded = True + if 'result' in response_data: return response_data['result'] elif 'error' in response_data: @@ -112,17 +116,21 @@ class SolanaWS: else: logger.warning(f"Unexpected response: {response_data}") return None - + async def subscribe(self): params = [ {"mentions": [FOLLOWED_WALLET]}, {"commitment": "confirmed"} ] - result = await self.ws_jsonrpc("logsSubscribe", params, doProcessResponse=False) - response = process_messages(self.websocket) - if result is not None: + # define onmessage as inline callback to get subscription_id which waits for last_msg_responded + # self.on_message = lambda message: self.subscription_id = message.get('result') + result = await self.ws_jsonrpc("logsSubscribe", params) + + if result is not None and result > 0: self.subscription_id = result logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + elif result: + logger.error("already subscribed") else: logger.error("Failed to subscribe") @@ -190,8 +198,13 @@ class SolanaWS: class SolanaAPI: def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None): self.process_transaction = process_transaction_callback - self.on_initial_subscription = on_initial_subscription_callback - self.on_bot_message = on_bot_message, + self.on_initial_subscription = on_initial_subscription_callback + # if callable(on_initial_subscription_callback) else lambda: None + + # Define a default lambda function for on_bot_message + default_on_bot_message = lambda message: logger.info(f"Bot message: {message}") + # Use the provided on_bot_message if it's callable, otherwise use the default + self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message self.dex = SolanaDEX(DISPLAY_CURRENCY) self.solana_ws = SolanaWS(on_message=self.process_transaction) @@ -201,6 +214,7 @@ class SolanaAPI: message = await solana_ws.message_queue.get() await self.process_transaction(message) + _first_subscription = True async def wallet_watch_loop(self): @@ -212,11 +226,11 @@ class SolanaAPI: await solana_ws.connect() await solana_ws.subscribe() - if first_subscription: - asyncio.create_task(self.on_initial_subscription()) + if first_subscription and self.on_initial_subscription is not None: + await self.on_initial_subscription first_subscription = False - await self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) @@ -235,7 +249,8 @@ class SolanaAPI: await solana_ws.unsubscribe() if solana_ws.websocket: await solana_ws.close() - await self.on_bot_message("Reconnecting...") + if self.on_bot_message: + await self.on_bot_message("Reconnecting...") await asyncio.sleep(5) async def get_last_transactions(self, account_address, check_interval=300, limit=1000): @@ -862,4 +877,4 @@ async def save_token_info(): json.dump(TOKENS_INFO, f, indent=2) -SAPI = SolanaAPI() \ No newline at end of file +SAPI = SolanaAPI( on_initial_subscription_callback=SolanaDEX.list_initial_wallet_states()) \ No newline at end of file