simplify app startup
This commit is contained in:
parent
5851af8f80
commit
8c75d1b650
@ -1,88 +1,36 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import uvicorn
|
import uvicorn
|
||||||
from asgiref.wsgi import WsgiToAsgi
|
|
||||||
import websockets
|
|
||||||
import json
|
|
||||||
|
|
||||||
import datetime
|
|
||||||
import os
|
import os
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
import datetime
|
||||||
|
import json
|
||||||
from threading import Thread
|
import websockets
|
||||||
from solana.rpc.async_api import AsyncClient
|
import logging
|
||||||
from solders.transaction import VersionedTransaction
|
|
||||||
from solana.rpc.types import TxOpts
|
|
||||||
from solders.keypair import Keypair
|
|
||||||
from jupiter_python_sdk.jupiter import Jupiter
|
|
||||||
from solana.rpc.commitment import Processed
|
|
||||||
|
|
||||||
from modules.webui import init_app
|
from modules.webui import init_app
|
||||||
from modules.storage import init_db, store_transaction
|
|
||||||
from modules.utils import telegram_utils, logging, get_pk
|
from modules.utils import telegram_utils, logging, get_pk
|
||||||
from modules.log_processor import watch_for_new_logs
|
from modules.log_processor import watch_for_new_logs
|
||||||
from modules.SolanaAPI import SAPI
|
from modules.SolanaAPI import SAPI
|
||||||
|
from config import DO_WATCH_WALLET
|
||||||
|
|
||||||
|
from asgiref.wsgi import WsgiToAsgi
|
||||||
|
from multiprocessing import Process
|
||||||
|
|
||||||
|
|
||||||
# config = load_config()
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
load_dotenv('.env.secret')
|
load_dotenv('.env.secret')
|
||||||
# Configuration
|
|
||||||
|
|
||||||
|
|
||||||
from config import (DO_WATCH_WALLET, logging, logger)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# # # # # # # # # # TELEGRAM # # # # # # # # # #
|
|
||||||
# if not telegram_utils.bot:
|
|
||||||
# try:
|
|
||||||
# asyncio.run(telegram_utils.initialize())
|
|
||||||
# except Exception as e:
|
|
||||||
# logging.error(f"Error initializing Telegram bot: {str(e)}")
|
|
||||||
# async def telegram_utils.send_telegram_message(message):
|
|
||||||
# try:
|
|
||||||
# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
|
|
||||||
# logging.info(f"Telegram message sent: {message}")
|
|
||||||
# # logging.info(f"Telegram message dummy sent: {message}")
|
|
||||||
# except Exception as e:
|
|
||||||
# logging.error(f"Error sending Telegram message: {str(e)}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# # # # # # # # # # DATABASE # # # # # # # # # #
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # #
|
|
||||||
|
|
||||||
|
|
||||||
# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # #
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# # # # # # # # # # Functionality # # # # # # # # # #
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def save_log(log):
|
async def save_log(log):
|
||||||
try:
|
try:
|
||||||
os.makedirs('./logs', exist_ok=True)
|
os.makedirs('./logs', exist_ok=True)
|
||||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
||||||
filename = f"./logs/log_{timestamp}.json"
|
filename = f"./logs/log_{timestamp}.json"
|
||||||
|
|
||||||
with open(filename, 'w') as f:
|
with open(filename, 'w') as f:
|
||||||
json.dump(log, f, indent=2)
|
json.dump(log, f, indent=2)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error saving RPC log: {e}")
|
logging.error(f"Error saving RPC log: {e}")
|
||||||
|
|
||||||
|
|
||||||
PROCESSING_LOG = False
|
PROCESSING_LOG = False
|
||||||
|
|
||||||
async def process_log(log_result):
|
async def process_log(log_result):
|
||||||
global PROCESSING_LOG
|
global PROCESSING_LOG
|
||||||
tr_details = {
|
tr_details = {
|
||||||
@ -101,13 +49,10 @@ async def process_log(log_result):
|
|||||||
|
|
||||||
logs = log_result['value']['logs']
|
logs = log_result['value']['logs']
|
||||||
try:
|
try:
|
||||||
# Detect swap operations in logs
|
|
||||||
PROCESSING_LOG = True
|
PROCESSING_LOG = True
|
||||||
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2']
|
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2']
|
||||||
|
|
||||||
|
|
||||||
if any(op in logs for op in swap_operations):
|
if any(op in logs for op in swap_operations):
|
||||||
# Save the log to a file
|
|
||||||
await save_log(log_result)
|
await save_log(log_result)
|
||||||
tx_signature_str = log_result['value']['signature']
|
tx_signature_str = log_result['value']['signature']
|
||||||
|
|
||||||
@ -118,29 +63,21 @@ async def process_log(log_result):
|
|||||||
while i < len(logs):
|
while i < len(logs):
|
||||||
log_entry = logs[i]
|
log_entry = logs[i]
|
||||||
|
|
||||||
# Check if we found the 'order_id'
|
|
||||||
if tr_details["order_id"] is None and "order_id" in log_entry:
|
if tr_details["order_id"] is None and "order_id" in log_entry:
|
||||||
# Extract the order_id
|
|
||||||
tr_details["order_id"] = log_entry.split(":")[-1].strip()
|
tr_details["order_id"] = log_entry.split(":")[-1].strip()
|
||||||
tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
|
tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
|
||||||
tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
|
tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
|
||||||
|
|
||||||
# Look for the token change amounts after tokens have been found
|
|
||||||
if "source_token_change" in log_entry:
|
if "source_token_change" in log_entry:
|
||||||
parts = log_entry.split(", ")
|
parts = log_entry.split(", ")
|
||||||
for part in parts:
|
for part in parts:
|
||||||
if "source_token_change" in part:
|
if "source_token_change" in part:
|
||||||
tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals
|
tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6
|
||||||
elif "destination_token_change" in part:
|
elif "destination_token_change" in part:
|
||||||
tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals
|
tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6
|
||||||
|
|
||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
# calculate percentage swapped by digging before_source_balance, source_token_change and after_source_balance
|
|
||||||
|
|
||||||
# "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752",
|
|
||||||
# "Program log: after_source_balance: 0, after_destination_balance: 770570049",
|
|
||||||
# "Program log: source_token_change: 19471871, destination_token_change: 770570049",
|
|
||||||
if "before_source_balance" in log_entry:
|
if "before_source_balance" in log_entry:
|
||||||
parts = log_entry.split(", ")
|
parts = log_entry.split(", ")
|
||||||
for part in parts:
|
for part in parts:
|
||||||
@ -151,25 +88,16 @@ async def process_log(log_result):
|
|||||||
for part in parts:
|
for part in parts:
|
||||||
if "source_token_change" in part:
|
if "source_token_change" in part:
|
||||||
source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6
|
source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6
|
||||||
|
|
||||||
|
|
||||||
# GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS
|
|
||||||
try:
|
try:
|
||||||
if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0:
|
if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0:
|
||||||
logging.warning("Incomplete swap details found in logs. Getting details from transaction")
|
logging.warning("Incomplete swap details found in logs. Getting details from transaction")
|
||||||
tr_details = await SAPI.get_transaction_details_info(tx_signature_str, logs)
|
tr_details = await SAPI.get_transaction_details_info(tx_signature_str, logs)
|
||||||
# onlt needed if no details got
|
|
||||||
if before_source_balance > 0 and source_token_change > 0:
|
if before_source_balance > 0 and source_token_change > 0:
|
||||||
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
|
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
|
||||||
#dirty fix for percentage > 100 (decimals 9 but expecting 6)
|
|
||||||
if tr_details["percentage_swapped"] > 100:
|
if tr_details["percentage_swapped"] > 100:
|
||||||
tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000
|
tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000
|
||||||
|
|
||||||
|
|
||||||
# update token info: ToDo: check, but already did
|
|
||||||
# all_token_addresses = list(set([tr_details["token_in"], tr_details["token_out"]]))
|
|
||||||
# await get_token_prices(all_token_addresses)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
|
token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
|
||||||
token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
|
token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
|
||||||
@ -184,7 +112,7 @@ async def process_log(log_result):
|
|||||||
|
|
||||||
message_text = (
|
message_text = (
|
||||||
f"<b>Swap detected: </b>\n"
|
f"<b>Swap detected: </b>\n"
|
||||||
f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " # ({tr_details['token_in']}) ({tr_details['token_out']})
|
f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for "
|
||||||
f"{tr_details['symbol_out']} \n"
|
f"{tr_details['symbol_out']} \n"
|
||||||
)
|
)
|
||||||
await telegram_utils.send_telegram_message(message_text)
|
await telegram_utils.send_telegram_message(message_text)
|
||||||
@ -195,16 +123,12 @@ async def process_log(log_result):
|
|||||||
logging.error(f"Error aquiring log details and following: {e}")
|
logging.error(f"Error aquiring log details and following: {e}")
|
||||||
await telegram_utils.send_telegram_message(f"Not followed! Error following move.")
|
await telegram_utils.send_telegram_message(f"Not followed! Error following move.")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error processing log: {e}")
|
logging.error(f"Error processing log: {e}")
|
||||||
|
|
||||||
|
|
||||||
PROCESSING_LOG = False
|
PROCESSING_LOG = False
|
||||||
return tr_details
|
return tr_details
|
||||||
|
|
||||||
|
|
||||||
async def process_messages(websocket):
|
async def process_messages(websocket):
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
@ -230,7 +154,6 @@ async def process_messages(websocket):
|
|||||||
|
|
||||||
except websockets.exceptions.ConnectionClosedError as e:
|
except websockets.exceptions.ConnectionClosedError as e:
|
||||||
logger.error(f"Connection closed unexpectedly: {e}")
|
logger.error(f"Connection closed unexpectedly: {e}")
|
||||||
# await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
|
|
||||||
pass
|
pass
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
logger.error(f"Failed to decode JSON: {e}")
|
logger.error(f"Failed to decode JSON: {e}")
|
||||||
@ -238,10 +161,8 @@ async def process_messages(websocket):
|
|||||||
logger.error(f"An unexpected error occurred: {e}")
|
logger.error(f"An unexpected error occurred: {e}")
|
||||||
|
|
||||||
|
|
||||||
pk = None
|
logger = logging.getLogger(__name__)
|
||||||
app = init_app()
|
app = init_app()
|
||||||
# app = init_app(follow_move_legacy)
|
|
||||||
# Convert Flask app to ASGI
|
|
||||||
asgi_app = WsgiToAsgi(app)
|
asgi_app = WsgiToAsgi(app)
|
||||||
|
|
||||||
async def restartable_task(task_func, *args, **kwargs):
|
async def restartable_task(task_func, *args, **kwargs):
|
||||||
@ -249,43 +170,50 @@ async def restartable_task(task_func, *args, **kwargs):
|
|||||||
try:
|
try:
|
||||||
await task_func(*args, **kwargs)
|
await task_func(*args, **kwargs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error in task {task_func.__name__}: {e}")
|
error_msg = f"Error in task {task_func.__name__}: {e}"
|
||||||
await telegram_utils.send_telegram_message(f"Error in task {task_func.__name__}: {e}")
|
logger.error(error_msg)
|
||||||
await asyncio.sleep(5) # Wait before retrying
|
await telegram_utils.send_telegram_message(error_msg)
|
||||||
|
await asyncio.sleep(5)
|
||||||
async def main():
|
|
||||||
global bot, PROCESSING_LOG, pk
|
|
||||||
|
|
||||||
|
async def init_bot():
|
||||||
|
# Initialize bot components
|
||||||
pk = await get_pk()
|
pk = await get_pk()
|
||||||
await telegram_utils.initialize()
|
await telegram_utils.initialize()
|
||||||
await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
||||||
|
|
||||||
# Start the log processor
|
# Start monitoring tasks
|
||||||
asyncio.create_task(restartable_task(watch_for_new_logs))
|
asyncio.create_task(restartable_task(watch_for_new_logs))
|
||||||
# process_transaction
|
|
||||||
|
|
||||||
if DO_WATCH_WALLET:
|
if DO_WATCH_WALLET:
|
||||||
asyncio.create_task(restartable_task(SAPI.wallet_watch_loop))
|
asyncio.create_task(restartable_task(SAPI.wallet_watch_loop))
|
||||||
await SAPI.wallet_watch_loop()
|
|
||||||
|
|
||||||
def run_asyncio_tasks():
|
|
||||||
asyncio.run(main())
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def run_server():
|
||||||
import multiprocessing
|
|
||||||
|
|
||||||
# Start the asyncio tasks in a separate process
|
|
||||||
process = multiprocessing.Process(target=run_asyncio_tasks)
|
|
||||||
process.start()
|
|
||||||
|
|
||||||
# Run the ASGI server
|
|
||||||
uvicorn.run(
|
uvicorn.run(
|
||||||
"app:asgi_app",
|
"app:asgi_app",
|
||||||
host="0.0.0.0",
|
host="0.0.0.0",
|
||||||
port=3001,
|
port=3001,
|
||||||
log_level="info", # "debug"
|
log_level="info",
|
||||||
reload=True
|
reload=True
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wait for the asyncio tasks to complete
|
async def main():
|
||||||
process.join()
|
# Start bot initialization in background
|
||||||
|
bot_task = asyncio.create_task(init_bot())
|
||||||
|
|
||||||
|
# Start server in a separate process
|
||||||
|
server_process = Process(target=run_server)
|
||||||
|
server_process.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Wait for bot initialization
|
||||||
|
await bot_task
|
||||||
|
# Keep main process running
|
||||||
|
await asyncio.Event().wait()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Shutting down...")
|
||||||
|
finally:
|
||||||
|
server_process.terminate()
|
||||||
|
server_process.join()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
asyncio.run(main())
|
@ -1,7 +1,7 @@
|
|||||||
import asyncio
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
import asyncio
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
from flask import Flask, jsonify, request, render_template, redirect, url_for
|
from flask import Flask, jsonify, request, render_template, redirect, url_for
|
||||||
# from flask_oauthlib.client import OAuth
|
# from flask_oauthlib.client import OAuth
|
||||||
@ -23,6 +23,7 @@ def init_app(tr_handler=None):
|
|||||||
global on_transaction
|
global on_transaction
|
||||||
on_transaction = tr_handler
|
on_transaction = tr_handler
|
||||||
app = Flask(__name__, template_folder='../templates', static_folder='../static')
|
app = Flask(__name__, template_folder='../templates', static_folder='../static')
|
||||||
|
executor = ThreadPoolExecutor(max_workers=10) # Adjust the number of workers as needed
|
||||||
login_manager = LoginManager(app)
|
login_manager = LoginManager(app)
|
||||||
login_manager.login_view = 'login'
|
login_manager.login_view = 'login'
|
||||||
|
|
||||||
@ -130,7 +131,7 @@ def init_app(tr_handler=None):
|
|||||||
|
|
||||||
# await process_wh(request_data)
|
# await process_wh(request_data)
|
||||||
# don't wait for the process to finish
|
# don't wait for the process to finish
|
||||||
asyncio.create_task(process_wh(request_data ))
|
executor.submit(asyncio.run, process_wh(request_data))
|
||||||
return jsonify({"status": "OK"}), 200
|
return jsonify({"status": "OK"}), 200
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error processing webhook: {e}")
|
logging.error(f"Error processing webhook: {e}")
|
||||||
@ -138,7 +139,7 @@ def init_app(tr_handler=None):
|
|||||||
|
|
||||||
# Flask route to retry processing the last log
|
# Flask route to retry processing the last log
|
||||||
|
|
||||||
async def process_wh( data):
|
async def process_wh(data):
|
||||||
global on_transaction
|
global on_transaction
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -379,4 +380,4 @@ def get_latest_log_file(wh:bool):
|
|||||||
utils.log.error(f"Error fetching latest log file: {e}")
|
utils.log.error(f"Error fetching latest log file: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
export = init_app
|
export = init_app
|
||||||
|
Loading…
x
Reference in New Issue
Block a user