gogo2/crypto/sol/modules/webui.py
2024-11-18 20:12:58 +02:00

592 lines
24 KiB
Python

import os
import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor
import traceback
from flask import Flask, jsonify, request, render_template, redirect, url_for
# from flask_oauthlib.client import OAuth
from flask_login import (
LoginManager,
UserMixin,
login_user,
login_required,
logout_user,
current_user,
)
import secrets
import json
# from crypto.sol.config import LIQUIDITY_TOKENS
from config import LIQUIDITY_TOKENS, YOUR_WALLET
from modules import storage, utils, SolanaAPI
from modules.utils import async_safe_call, decode_instruction_data
from modules.storage import Storage
import os
import logging
from datetime import datetime
on_transaction = None
def init_app(tr_handler=None):
global on_transaction
on_transaction = tr_handler
app = Flask(__name__, template_folder="../templates", static_folder="../static")
app.secret_key = secrets.token_hex(16)
executor = ThreadPoolExecutor(
max_workers=10
) # Adjust the number of workers as needed
login_manager = LoginManager(app)
login_manager.login_view = "login"
storage = Storage()
# Ensure database connection
async def ensure_storage_connection():
if not storage.is_connected():
await storage.connect()
ensure_storage_connection()
# oauth = OAuth(app)
# google = oauth.remote_app(
# 'google',
# consumer_key='YOUR_GOOGLE_CLIENT_ID',
# consumer_secret='YOUR_GOOGLE_CLIENT_SECRET',
# request_token_params={
# 'scope': 'email'
# },
# base_url='https://www.googleapis.com/oauth2/v1/',
# request_token_url=None,
# access_token_method='POST',
# access_token_url='https://accounts.google.com/o/oauth2/token',
# authorize_url='https://accounts.google.com/o/oauth2/auth',
# )
login_manager = LoginManager()
login_manager.init_app(app)
logger = logging.getLogger(__name__)
# API
@app.route("/tr/<wallet>/<tx_signature>", methods=["GET", "POST"])
async def transaction_notified(wallet, tx_signature):
try:
logger.info(
f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}"
)
request_data = request.get_json() if request.is_json else None
if not request_data:
# Process the transaction
# tr = await get_swap_transaction_details(tx_signature)
tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, [])
else:
tr = request_data
# ToDo - probably optimize
tr["symbol_in"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
tr["token_in"]
)
tr["symbol_out"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
tr["token_out"]
)
prices = await SolanaAPI.DEX.get_token_prices(
[tr["token_in"], tr["token_out"]]
)
tr["value_in_USD"] = prices.get(tr["token_in"], 0) * tr["amount_in"]
tr["value_out_USD"] = prices.get(tr["token_out"], 0) * tr["amount_out"]
notification = f"<b>Got TXN notification:</b>: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n"
logging.info(notification)
await utils.telegram_utils.send_telegram_message(notification)
# Store the notified transaction in the database
# original_transaction = storage.Transaction(
# wallet=wallet,
# transaction_type="SWAP",
# symbol_in=tr['symbol_in'],
# amount_in=tr['amount_in'],
# value_in_usd=tr['value_in_USD'],
# symbol_out=tr['symbol_out'],
# amount_out=tr['amount_out'],
# value_out_usd=tr['value_out_USD'],
# tx_signature=tx_signature
# )
# await storage.store_transaction(original_transaction)
# # Attempt to execute the copytrade transaction
try:
await SolanaAPI.SAPI.follow_move(tr)
# Store the successful copytrade transaction
# follow_transaction = storage.Transaction(
# wallet=wallet,
# transaction_type="SWAP",
# symbol_in=tr['symbol_in'],
# amount_in=tr['amount_in'],
# value_in_usd=tr['value_in_USD'],
# symbol_out=tr['symbol_out'],
# amount_out=tr['amount_out'],
# value_out_usd=tr['value_out_USD'],
# tx_signature=tx_signature
# )
# await storage.store_transaction(follow_transaction)
except Exception as e:
# # Store the failed copytrade transaction
# failed_transaction = storage.Transaction(
# wallet=wallet,
# transaction_type="SWAP_FAIL",
# symbol_in=tr['symbol_in'],
# amount_in=tr['amount_in'],
# value_in_usd=tr['value_in_USD'],
# symbol_out=tr['symbol_out'],
# amount_out=tr['amount_out'],
# value_out_usd=tr['value_out_USD'],
# tx_signature=tx_signature
# )
# await storage.store_transaction(failed_transaction)
logging.error(f"Copytrade transaction failed: {e}")
# ToDo - probably optimize
await SolanaAPI.SAPI.save_token_info()
return jsonify(tr), 200
except Exception as e:
logging.error(f"Error processing transaction: {e}")
return jsonify({"error": "Failed to process transaction"}), 500
@app.route("/wh", methods=["POST"])
async def webhook():
try:
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
logger.info("Processing webhook")
request_data = request.get_json() if request.is_json else None
if not request_data:
return jsonify({"error": "No data in request"}), 400
if "description" in request_data[0] and request_data[0]["description"]:
logger.info(request_data[0]["description"])
else:
logger.info(f"Webhook data: {request_data}")
# save dump to /cache/last-webhook-{datetime}.json
with open(
os.path.join(SolanaAPI.root_path, "logs", f"wh_{current_time}.json"),
"w",
) as f:
json.dump(request_data, f)
if "meta" in request_data[0]:
meta = request_data[0]["meta"]
# Parse inner instructions
for inner_ix in meta.get("innerInstructions", []):
for instruction in inner_ix.get("instructions", []):
decoded = decode_instruction_data(instruction["data"])
logger.info(f"Instruction data decoded: {decoded}")
# Example of pattern matching for specific instruction types
if (
decoded["instruction_type"] == 1
): # Example: swap instruction
# Parse parameters based on program type
# Different DEXes will have different parameter layouts
pass
# await process_wh(request_data)
# don't wait for the process to finish
executor.submit(asyncio.run, process_wh(request_data))
return jsonify({"status": "OK"}), 200
except Exception as e:
logging.error(f"Error processing webhook: {e}")
return jsonify({"error": "Failed to process webhook"}), 500
# Flask route to retry processing the last log
async def process_wh(data):
global on_transaction
try:
if data[0].get("type") == "SWAP":
swap_event = data[0]["events"].get("swap")
if not swap_event:
logging.warning("No swap event found in data")
return
# Extract token input details from the first token input
token_inputs = swap_event.get("tokenInputs", [])
token_outputs = swap_event.get("tokenOutputs", [])
tr = {}
wallet = data[0]["feePayer"] # Using feePayer as the wallet address
tx_signature = data[0]["signature"]
usdcMint = LIQUIDITY_TOKENS[0]
solMint = LIQUIDITY_TOKENS[1]
try:
# Determine transaction type
if (
token_inputs
and token_outputs
and LIQUIDITY_TOKENS[0]
in [token_inputs[0]["mint"], token_outputs[0]["mint"]]
):
if token_inputs[0]["mint"] == usdcMint:
tr["type"] = "BUY"
else:
tr["type"] = "SELL"
else:
tr["type"] = "SWAP"
if swap_event.get("nativeInput", None):
tr["token_in"] = solMint
tr["amount_in"] = (
int(swap_event.get("nativeInput")["amount"]) / 10**9
)
tr["type"] = "BUY"
tr["token_in_decimals"] = 9
if swap_event.get("nativeOutput", None):
tr["token_out"] = solMint
tr["amount_out"] = (
int(swap_event.get("nativeOutput")["amount"]) / 10**9
)
tr["type"] = "SELL"
tr["token_out_decimals"] = 9
# if we don't have token_in yet
if "token_in" not in tr:
if not token_inputs or len(token_inputs) == 0:
logging.info(
"Assumed USDC as first token. BUY transaction detected"
)
tr["token_in"] = usdcMint
tr["type"] = "BUY"
tr["amount_in"] = await calculate_price_amount(
token_outputs[0]
)
else:
token_in = token_inputs[0]
tr["token_in"] = token_in["mint"]
tr["token_in_decimals"] = get_decimals(token_in)
tr["amount_in"] = calculate_amount(token_in)
# if we don't have token_out yet
if "token_out" not in tr:
if not token_outputs or len(token_outputs) == 0:
logging.info(
"Assumed USDC as second token. SELL transaction detected"
)
tr["token_out"] = usdcMint
tr["type"] = "SELL"
tr["amount_out"] = await calculate_price_amount(
token_inputs[0]
)
else:
token_out = token_outputs[0]
tr["token_out"] = token_out["mint"]
tr["token_out_decimals"] = get_decimals(token_out)
tr["amount_out"] = calculate_amount(token_out)
# Store transaction in database
if tr["type"] in ["BUY", "SELL"]:
is_buy = tr["type"] == "BUY"
# transaction = storage.Transaction(
# wallet=wallet,
# transaction_type=tr["type"],
# symbol_in=tr["token_in"],
# amount_in=tr["amount_in"] if is_buy else 0,
# value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
# symbol_out=tr["token_out"],
# amount_out=tr["amount_out"] if not is_buy else 0,
# value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
# tx_signature=tx_signature
# )
# await storage.store_transaction(transaction)
if swap_event.get("nativeInput"): # SOL
token_in = swap_event.get("nativeInput", [])
logger.info(
f"Native input (SOL) detected ({token_in["amount"]})"
)
if (
not tr["token_in"]
or not tr["token_out"]
or tr["amount_in"] == 0
or tr["amount_out"] == 0
):
logging.warning(
"Incomplete swap details found in logs. Getting details from transaction"
)
tx_signature = data[0].get("signature")
logs = data[0].get("logs", [])
tr = await SolanaAPI.SAPI.get_transaction_details_info(
tx_signature, logs
)
except Exception as e:
logging.error(f"Error loading transaction token data: {str(e)}")
# ToDo - probably optimize
tr["symbol_in"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
tr["token_in"]
)
tr["symbol_out"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
tr["token_out"]
)
# ToDo - optimize
# prices = await SolanaAPI.DEX.get_token_prices(
# [tr["token_in"], tr["token_out"]]
# )
# tr["token_in_price"] = prices.get(tr["token_in"], 0)
# tr["token_out_price"] = prices.get(tr["token_out"], 0)
# tr["value_in_USD"] = prices.get(tr["token_in"], 0) * tr["amount_in"]
# tr["value_out_USD"] = prices.get(tr["token_out"], 0) * tr["amount_out"]
# notification = f"<b>Got WH notification:</b>: {tr['amount_in']} {tr['symbol_in'] or tr["token_in"]} swapped for {tr['amount_out']} {tr['symbol_out']} ${tr['value_out_USD']}\n"
# logging.info(notification)
# await utils.telegram_utils.send_telegram_message(notification)
# Store the notified transaction in the database
# storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
# copyTransaction = storage.Transaction(
# wallet=wallet,
# transaction_type=tr["type"],
# symbol_in=tr["token_in"],
# amount_in=tr["amount_in"] if is_buy else 0,
# value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
# symbol_out=tr["token_out"],
# amount_out=tr["amount_out"] if not is_buy else 0,
# value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
# tx_signature=tx_signature
# )
# try: await storage.store_transaction(copyTransaction)
# except: logging.error(traceback.format_exc())
# Attempt to execute the copytrade transaction
try:
# await SolanaAPI.SAPI.follow_move(tr)
if on_transaction:
await async_safe_call(on_transaction, tr)
else:
await SolanaAPI.SAPI.follow_move(tr)
# Store the successful copytrade transaction
# await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
except Exception as e:
# Store the failed copytrade transaction
# await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
logging.error(f"Copytrade transaction failed: {e}")
# ToDo - probably optimize
await SolanaAPI.DEX.save_token_info()
else:
logger.info("wh transaction is not a swap. skipping...")
except Exception as e:
logging.error(f"Error processing transaction notification: {str(e)}")
# Log the full traceback for debugging
logging.error(traceback.format_exc())
def get_decimals(token_data):
return token_data["rawTokenAmount"].get("decimals") or token_data[
"rawTokenAmount"
].get("decimalFs", 0)
def calculate_amount(token_data):
decimals = get_decimals(token_data)
token_amount = int(token_data["rawTokenAmount"]["tokenAmount"])
return float(token_amount / 10**decimals)
async def calculate_price_amount(token_data, prices=None):
if not prices:
prices = await SolanaAPI.DEX.get_token_prices([token_data["mint"]])
decimals = get_decimals(token_data)
token_amount = int(token_data["rawTokenAmount"]["tokenAmount"])
return prices[token_data["mint"]] * token_amount / 10**decimals
@app.route("/replay_wh", methods=["POST"])
async def replay_wh():
try:
data = request.get_json()
filename = data.get("filename")
if not filename:
return jsonify({"error": "Filename not provided"}), 400
file_path = os.path.join(SolanaAPI.root_path, "logs", filename)
if not os.path.exists(file_path):
return jsonify({"error": "File not found"}), 404
with open(file_path, "r") as f:
log_data = json.load(f)
await process_wh(log_data)
return jsonify({"status": "Replay successful"}), 200
except Exception as e:
logging.error(f"Error replaying webhook file: {e}")
return jsonify({"error": "Failed to replay webhook file"}), 500
@app.route("/retry-last-log", methods=["GET"])
async def retry_last_log():
wh = request.args.get("wh", "false").lower() == "true"
latest_log_file = get_latest_log_file(wh)
if not latest_log_file:
return jsonify({"error": "No log files found"}), 404
try:
utils.log.info(f"Processing latest log file: {latest_log_file}")
with open(latest_log_file, "r") as f:
log = json.load(f)
if wh:
result = await process_wh(log)
else:
result = await SolanaAPI.process_log(log)
return (
jsonify(
{
"file": latest_log_file,
"status": "Log dump processed successfully",
"result": result,
}
),
200,
)
except Exception as e:
utils.log.error(f"Error processing log dump: {e}")
return jsonify({"error": "Failed to process log"}), 500
# # # #
# AUTHENTICATION
# # # #
@app.route("/login/google/authorized")
def authorized():
# resp = google.authorized_response()
# if resp is None or resp.get('access_token') is None:
# return 'Access denied: reason={} error={}'.format(
# request.args['error_reason'],
# request.args['error_description']
# )
# session['google_token'] = (resp['access_token'], '')
# user_info = google.get('userinfo')
# user = storage.get_or_create_user(user_info.data['email'], user_info.data['id'])
# login_user(user)
return redirect(url_for("index"))
class User(UserMixin):
def __init__(self, id, username, email):
self.id = id
self.username = username
self.email = email
@login_manager.user_loader
def load_user(user_id):
user_data = storage.get_user_by_id(user_id)
if user_data:
return User(
id=user_data["id"],
username=user_data["username"],
email=user_data["email"],
)
return None
@app.route("/")
def index():
return render_template("index.html")
@login_manager.unauthorized_handler
def unauthorized():
return redirect("/login?next=" + request.path)
# return jsonify({'error': 'Unauthorized'}), 401
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
user = storage.authenticate_user(username, password)
if user:
login_user(
User(id=user["id"], username=user["username"], email=user["email"])
)
return redirect(url_for("dashboard"))
else:
return render_template("login.html", error="Invalid credentials")
elif request.args.get("google"):
# Uncomment the following line if Google OAuth is set up
# return google.authorize(callback=url_for('authorized', _external=True))
return render_template("login.html", error="Google OAuth not configured")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("index"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_template("dashboard.html")
@app.route("/generate_api_key", methods=["POST"])
@login_required
def generate_api_key():
api_key = secrets.token_urlsafe(32)
storage.store_api_key(current_user.id, api_key)
return jsonify({"api_key": api_key})
@app.route("/wallet/<int:wallet_id>/transactions", methods=["GET"])
@login_required
@login_required
def get_transactions(wallet_id):
transactions = storage.get_transactions(wallet_id)
return jsonify(transactions)
@app.route("/wallet/<int:wallet_id>/holdings", methods=["GET"])
@login_required
@login_required
def get_holdings(wallet_id):
holdings = storage.get_holdings(wallet_id)
return jsonify(holdings)
return app
def teardown_app():
# Close the database connection
storage.disconnect()
# Function to find the latest log file
def get_latest_log_file(wh: bool):
log_dir = os.path.join(SolanaAPI.root_path, "logs")
try:
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
# filter files mask log_20241005_004103_143116.json
if wh:
files = [
f
for f in os.listdir(log_dir)
if os.path.isfile(os.path.join(log_dir, f)) and f.startswith("wh_")
]
else:
files = [
f
for f in os.listdir(log_dir)
if os.path.isfile(os.path.join(log_dir, f)) and f.startswith("log_")
]
latest_file = max(
files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))
)
return os.path.join(log_dir, latest_file)
except Exception as e:
utils.log.error(f"Error fetching latest log file: {e}")
return None
export = init_app, teardown_app