using prod server

This commit is contained in:
Dobromir Popov 2024-10-19 23:39:02 +03:00
parent be0e9b54ac
commit 6088379ff7

View File

@ -1,4 +1,6 @@
import asyncio import asyncio
import uvicorn
from asgiref.wsgi import WsgiToAsgi
import websockets import websockets
import json import json
from flask import Flask, render_template, request, jsonify from flask import Flask, render_template, request, jsonify
@ -35,10 +37,10 @@ from dotenv import load_dotenv,set_key
import aiohttp import aiohttp
from typing import List, Dict from typing import List, Dict
import requests import requests
import threading
import re import re
from typing import List, Dict, Any, Tuple from typing import List, Dict, Any, Tuple
import random import random
from threading import Thread
app = Flask(__name__) app = Flask(__name__)
@ -77,6 +79,7 @@ def get_latest_log_file():
return None return None
# Flask route to retry processing the last log # Flask route to retry processing the last log
@app.route('/retry', methods=['GET'])
@app.route('/retry-last-log', methods=['GET']) @app.route('/retry-last-log', methods=['GET'])
async def retry_last_log(): async def retry_last_log():
latest_log_file = get_latest_log_file() latest_log_file = get_latest_log_file()
@ -100,7 +103,23 @@ async def retry_last_log():
logging.error(f"Error processing log dump: {e}") logging.error(f"Error processing log dump: {e}")
return jsonify({"error": "Failed to process log"}), 500 return jsonify({"error": "Failed to process log"}), 500
#const webhookPath = `/tr/${followedWallet.toBase58()}/${logs.signature}`;
@app.route('/tr/<wallet>/<tx_signature>', methods=['GET', 'POST'])
async def transaction_notified(wallet, tx_signature):
try:
logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}")
# Process the transaction
# tr = await get_swap_transaction_details(tx_signature)
tr = await get_transaction_details_info(tx_signature, [])
get_token_metadata_symbol(tr)
# ToDo - probably optimize
await follow_move(tr['token_in'])
await follow_move(tr['token_out'])
await save_token_info()
return jsonify(tr), 200
except Exception as e:
logging.error(f"Error processing transaction: {e}")
return jsonify({"error": "Failed to process transaction"}), 500
# Configuration # Configuration
@ -469,7 +488,7 @@ async def get_wallet_balances(wallet_address, doGetTokenName=True):
# sleep for 1 second to avoid rate limiting # sleep for 1 second to avoid rate limiting
await asyncio.sleep(2) await asyncio.sleep(2)
TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) TOKENS_INFO[mint]['holdedAmount'] = round(amount, decimals)
TOKENS_INFO[mint]['decimals'] = decimals TOKENS_INFO[mint]['decimals'] = decimals
balances[mint] = { balances[mint] = {
'name': token_name or 'N/A', 'name': token_name or 'N/A',
@ -838,6 +857,17 @@ PROCESSING_LOG = False
async def process_log(log_result): async def process_log(log_result):
global PROCESSING_LOG global PROCESSING_LOG
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
if log_result['value']['err']: if log_result['value']['err']:
return return
@ -855,16 +885,6 @@ async def process_log(log_result):
before_source_balance = 0 before_source_balance = 0
source_token_change = 0 source_token_change = 0
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
i = 0 i = 0
while i < len(logs): while i < len(logs):
log_entry = logs[i] log_entry = logs[i]
@ -956,6 +976,7 @@ async def process_log(log_result):
PROCESSING_LOG = False PROCESSING_LOG = False
return tr_details return tr_details
# "Program log: Instruction: Swap2", # "Program log: Instruction: Swap2",
# "Program log: order_id: 13985890735038016", # "Program log: order_id: 13985890735038016",
# "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source # "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source
@ -1003,7 +1024,7 @@ async def follow_move(move):
# Use the balance # Use the balance
print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
else: else:
print("No ballance found for {move['symbol_in']}. Skipping move.") print(f"No ballance found for {move['symbol_in']}. Skipping move.")
await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
return return
@ -1173,7 +1194,7 @@ async def wallet_watch_loop():
subscription_id = await subscribe(websocket) subscription_id = await subscribe(websocket)
if subscription_id is not None: if subscription_id is not None:
await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") # await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
if first_subscription: if first_subscription:
asyncio.create_task( list_initial_wallet_states()) asyncio.create_task( list_initial_wallet_states())
first_subscription = False first_subscription = False
@ -1279,7 +1300,7 @@ async def process_messages(websocket):
except websockets.exceptions.ConnectionClosedError as e: except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed unexpectedly: {e}") logger.error(f"Connection closed unexpectedly: {e}")
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") # await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
pass pass
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}") logger.error(f"Failed to decode JSON: {e}")
@ -1309,22 +1330,43 @@ async def check_PK():
await send_telegram_message("<b>Warning:</b> Private key not found in environment variables. Will not be able to sign transactions.") await send_telegram_message("<b>Warning:</b> Private key not found in environment variables. Will not be able to sign transactions.")
# Convert Flask app to ASGI
asgi_app = WsgiToAsgi(app)
async def main(): async def main():
await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await check_PK() await check_PK()
await wallet_watch_loop() await wallet_watch_loop()
async def run_flask(): def run_asyncio_loop(loop):
loop = asyncio.get_running_loop() asyncio.set_event_loop(loop)
await loop.run_in_executor(None, lambda: app.run(debug=False, port=3001, use_reloader=False)) loop.run_forever()
async def run_all(): async def run_all():
await asyncio.gather( main_task = asyncio.create_task(main())
main(), await main_task
run_flask()
)
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(run_all()) # Create a new event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Start the asyncio loop in a separate thread
thread = Thread(target=run_asyncio_loop, args=(loop,))
thread.start()
# Schedule the run_all coroutine in the event loop
asyncio.run_coroutine_threadsafe(run_all(), loop)
# Run Uvicorn in the main thread
uvicorn.run(
"app:asgi_app", # Replace 'app' with the actual name of this Python file if different
host="127.0.0.1",
port=3001,
log_level="debug",
reload=True
)
# When Uvicorn exits, stop the asyncio loop
loop.call_soon_threadsafe(loop.stop)
thread.join()