diff --git a/crypto/sol/app.py b/crypto/sol/app.py
index 867e270..587da77 100644
--- a/crypto/sol/app.py
+++ b/crypto/sol/app.py
@@ -16,18 +16,18 @@ import requests
import re
import random
from threading import Thread
-from solana.keypair import Keypair
from solana.rpc.async_api import AsyncClient
-from solana.transaction import VersionedTransaction, TxOpts
-from solana.rpc.types import Processed
-from jupiter import Jupiter
-
-app = Flask(__name__)
+from solders.transaction import VersionedTransaction
+from solana.rpc.types import TxOpts
+from solana.rpc.commitment import Confirmed, Finalized, Processed
+from solders.keypair import Keypair
+from jupiter_python_sdk.jupiter import Jupiter
from modules.webui import init_app
from modules.storage import init_db, store_transaction
-from modules.utils import telegram_utils, logging, get_pk, send_telegram_message
-from modules.SolanaAPI import SAPI, SolanaAPI, get_wallet_balances, get_transaction_details_with_retry, save_token_info
+from modules.utils import telegram_utils, logging, get_pk
+from modules.SolanaAPI import SAPI
+
# config = load_config()
load_dotenv()
@@ -199,11 +199,11 @@ async def process_log(log_result):
)
await telegram_utils.send_telegram_message(message_text)
await follow_move(tr_details)
- await save_token_info()
+ await SAPI.save_token_info()
except Exception as e:
logging.error(f"Error aquiring log details and following: {e}")
- await send_telegram_message(f"Not followed! Error following move.")
+ await telegram_utils.send_telegram_message(f"Not followed! Error following move.")
@@ -244,21 +244,21 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
async def follow_move(move):
- your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+ your_balances = await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
if your_balance_info is not None:
# Use the balance
print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
else:
print(f"No ballance found for {move['symbol_in']}. Skipping move.")
- await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
+ await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
return
your_balance = your_balance_info['amount']
token_info = TOKENS_INFO.get(move['token_in'])
- token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in'])
+ token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata(move['token_in'])
token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out'])
if not your_balance:
@@ -279,7 +279,7 @@ async def follow_move(move):
except ValueError:
msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
logging.warning(msg)
- await send_telegram_message(msg)
+ await telegram_utils.send_telegram_message(msg)
return
amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
@@ -325,7 +325,8 @@ async def follow_move(move):
logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}")
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
- signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message))
+ message = raw_transaction.message
+ signature = private_key.sign_message(message.to_bytes_versioned())
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
@@ -338,7 +339,7 @@ async def follow_move(move):
notification += f"\n\nTransaction: {transaction_id}"
await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
- tx_details = await get_transaction_details_with_retry(transaction_id)
+ tx_details = await SAPI.get_transaction_details_with_retry(transaction_id)
if tx_details is not None:
break
@@ -354,7 +355,7 @@ async def follow_move(move):
await telegram_utils.send_telegram_message(error_message)
amount = amount * 0.75
- await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+ await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
try:
if tx_details is None:
@@ -377,7 +378,7 @@ async def follow_move(move):
f"\n\nTransaction: {transaction_id}"
)
logging.info(notification)
- await send_telegram_message(notification)
+ await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
@@ -430,34 +431,38 @@ async def process_messages(websocket):
logger.error(f"An unexpected error occurred: {e}")
-pk = get_pk()
+pk = None
# Convert Flask app to ASGI
-asgi_app = WsgiToAsgi(app)
+asgi_app = WsgiToAsgi(init_app)
async def main():
- global solanaAPI, bot, PROCESSING_LOG
+ global solanaAPI, bot, PROCESSING_LOG, pk
+ pk = await get_pk()
await telegram_utils.initialize()
await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
# process_transaction
await SAPI.wallet_watch_loop()
-
-async def run_all():
- await main()
+def run_asyncio_tasks():
+ asyncio.run(main())
if __name__ == '__main__':
- try:
- asyncio.run(run_all())
- except Exception as e:
- logging.error(f"An error occurred: {e}")
+ import multiprocessing
- flask_app = init_app()
+ # Start the asyncio tasks in a separate process
+ process = multiprocessing.Process(target=run_asyncio_tasks)
+ process.start()
+
+ # Run the ASGI server
uvicorn.run(
- flask_app,
+ "app:asgi_app",
host="127.0.0.1",
port=3001,
log_level="debug",
reload=True
)
+
+ # Wait for the asyncio tasks to complete
+ process.join()
diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py
index 1328d71..7b22ca4 100644
--- a/crypto/sol/modules/SolanaAPI.py
+++ b/crypto/sol/modules/SolanaAPI.py
@@ -74,7 +74,8 @@ class SolanaWS:
self.message_queue = asyncio.Queue()
self.on_message = on_message
self.websocket = None
-
+ self.last_msg_responded = False
+
async def connect(self):
while True:
try:
@@ -85,8 +86,10 @@ class SolanaWS:
except Exception as e:
logger.error(f"Failed to connect to {current_url}: {e}")
await asyncio.sleep(5)
+
- async def ws_jsonrpc(self, ws, method, params=None, doProcessResponse = True):
+ async def ws_jsonrpc(self, method, params=None, doProcessResponse = True):
+
if not isinstance(params, list):
params = [params] if params is not None else []
@@ -96,14 +99,15 @@ class SolanaWS:
"method": method,
"params": params
}
-
- await ws.send(json.dumps(request))
+ self.last_msg_responded = False
+ await self.websocket.send(json.dumps(request))
if not doProcessResponse:
return None
else:
response = await self.websocket.recv()
response_data = json.loads(response)
-
+ self.last_msg_responded = True
+
if 'result' in response_data:
return response_data['result']
elif 'error' in response_data:
@@ -112,17 +116,21 @@ class SolanaWS:
else:
logger.warning(f"Unexpected response: {response_data}")
return None
-
+
async def subscribe(self):
params = [
{"mentions": [FOLLOWED_WALLET]},
{"commitment": "confirmed"}
]
- result = await self.ws_jsonrpc("logsSubscribe", params, doProcessResponse=False)
- response = process_messages(self.websocket)
- if result is not None:
+ # define onmessage as inline callback to get subscription_id which waits for last_msg_responded
+ # self.on_message = lambda message: self.subscription_id = message.get('result')
+ result = await self.ws_jsonrpc("logsSubscribe", params)
+
+ if result is not None and result > 0:
self.subscription_id = result
logger.info(f"Subscription successful. Subscription id: {self.subscription_id}")
+ elif result:
+ logger.error("already subscribed")
else:
logger.error("Failed to subscribe")
@@ -190,8 +198,13 @@ class SolanaWS:
class SolanaAPI:
def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None):
self.process_transaction = process_transaction_callback
- self.on_initial_subscription = on_initial_subscription_callback
- self.on_bot_message = on_bot_message,
+ self.on_initial_subscription = on_initial_subscription_callback
+ # if callable(on_initial_subscription_callback) else lambda: None
+
+ # Define a default lambda function for on_bot_message
+ default_on_bot_message = lambda message: logger.info(f"Bot message: {message}")
+ # Use the provided on_bot_message if it's callable, otherwise use the default
+ self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message
self.dex = SolanaDEX(DISPLAY_CURRENCY)
self.solana_ws = SolanaWS(on_message=self.process_transaction)
@@ -201,6 +214,7 @@ class SolanaAPI:
message = await solana_ws.message_queue.get()
await self.process_transaction(message)
+
_first_subscription = True
async def wallet_watch_loop(self):
@@ -212,11 +226,11 @@ class SolanaAPI:
await solana_ws.connect()
await solana_ws.subscribe()
- if first_subscription:
- asyncio.create_task(self.on_initial_subscription())
+ if first_subscription and self.on_initial_subscription is not None:
+ await self.on_initial_subscription
first_subscription = False
- await self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
+ self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
receive_task = asyncio.create_task(solana_ws.receive_messages())
process_task = asyncio.create_task(solana_ws.process_messages())
@@ -235,7 +249,8 @@ class SolanaAPI:
await solana_ws.unsubscribe()
if solana_ws.websocket:
await solana_ws.close()
- await self.on_bot_message("Reconnecting...")
+ if self.on_bot_message:
+ await self.on_bot_message("Reconnecting...")
await asyncio.sleep(5)
async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
@@ -862,4 +877,4 @@ async def save_token_info():
json.dump(TOKENS_INFO, f, indent=2)
-SAPI = SolanaAPI()
\ No newline at end of file
+SAPI = SolanaAPI( on_initial_subscription_callback=SolanaDEX.list_initial_wallet_states())
\ No newline at end of file