diff --git a/crypto/sol/app.py b/crypto/sol/app.py index a3040a6..aa5a77c 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1,4 +1,6 @@ import asyncio +import uvicorn +from asgiref.wsgi import WsgiToAsgi import websockets import json from flask import Flask, render_template, request, jsonify @@ -35,10 +37,10 @@ from dotenv import load_dotenv,set_key import aiohttp from typing import List, Dict import requests -import threading import re from typing import List, Dict, Any, Tuple import random +from threading import Thread app = Flask(__name__) @@ -77,6 +79,7 @@ def get_latest_log_file(): return None # Flask route to retry processing the last log +@app.route('/retry', methods=['GET']) @app.route('/retry-last-log', methods=['GET']) async def retry_last_log(): latest_log_file = get_latest_log_file() @@ -100,7 +103,23 @@ async def retry_last_log(): logging.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 - +#const webhookPath = `/tr/${followedWallet.toBase58()}/${logs.signature}`; +@app.route('/tr//', methods=['GET', 'POST']) +async def transaction_notified(wallet, tx_signature): + try: + logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") + # Process the transaction + # tr = await get_swap_transaction_details(tx_signature) + tr = await get_transaction_details_info(tx_signature, []) + get_token_metadata_symbol(tr) + # ToDo - probably optimize + await follow_move(tr['token_in']) + await follow_move(tr['token_out']) + await save_token_info() + return jsonify(tr), 200 + except Exception as e: + logging.error(f"Error processing transaction: {e}") + return jsonify({"error": "Failed to process transaction"}), 500 # Configuration @@ -469,7 +488,7 @@ async def get_wallet_balances(wallet_address, doGetTokenName=True): # sleep for 1 second to avoid rate limiting await asyncio.sleep(2) - TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) + TOKENS_INFO[mint]['holdedAmount'] = round(amount, decimals) TOKENS_INFO[mint]['decimals'] = decimals balances[mint] = { 'name': token_name or 'N/A', @@ -837,6 +856,17 @@ async def save_log(log): PROCESSING_LOG = False async def process_log(log_result): global PROCESSING_LOG + + tr_details = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } if log_result['value']['err']: return @@ -854,17 +884,7 @@ async def process_log(log_result): before_source_balance = 0 source_token_change = 0 - - tr_details = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } + i = 0 while i < len(logs): log_entry = logs[i] @@ -956,6 +976,7 @@ async def process_log(log_result): PROCESSING_LOG = False return tr_details + # "Program log: Instruction: Swap2", # "Program log: order_id: 13985890735038016", # "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source @@ -1003,7 +1024,7 @@ async def follow_move(move): # Use the balance print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") else: - print("No ballance found for {move['symbol_in']}. Skipping move.") + print(f"No ballance found for {move['symbol_in']}. Skipping move.") await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") return @@ -1173,7 +1194,7 @@ async def wallet_watch_loop(): subscription_id = await subscribe(websocket) if subscription_id is not None: - await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") + # await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") if first_subscription: asyncio.create_task( list_initial_wallet_states()) first_subscription = False @@ -1279,7 +1300,7 @@ async def process_messages(websocket): except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") + # await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") pass except json.JSONDecodeError as e: logger.error(f"Failed to decode JSON: {e}") @@ -1309,22 +1330,43 @@ async def check_PK(): await send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") - +# Convert Flask app to ASGI +asgi_app = WsgiToAsgi(app) async def main(): await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() await wallet_watch_loop() -async def run_flask(): - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, lambda: app.run(debug=False, port=3001, use_reloader=False)) +def run_asyncio_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() async def run_all(): - await asyncio.gather( - main(), - run_flask() - ) + main_task = asyncio.create_task(main()) + await main_task if __name__ == '__main__': - asyncio.run(run_all()) \ No newline at end of file + # Create a new event loop + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Start the asyncio loop in a separate thread + thread = Thread(target=run_asyncio_loop, args=(loop,)) + thread.start() + + # Schedule the run_all coroutine in the event loop + asyncio.run_coroutine_threadsafe(run_all(), loop) + + # Run Uvicorn in the main thread + uvicorn.run( + "app:asgi_app", # Replace 'app' with the actual name of this Python file if different + host="127.0.0.1", + port=3001, + log_level="debug", + reload=True + ) + + # When Uvicorn exits, stop the asyncio loop + loop.call_soon_threadsafe(loop.stop) + thread.join() \ No newline at end of file