import os import sys import asyncio from concurrent.futures import ThreadPoolExecutor import traceback from flask import Flask, jsonify, request, render_template, redirect, url_for # from flask_oauthlib.client import OAuth from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user import secrets import json # from crypto.sol.config import LIQUIDITY_TOKENS from config import LIQUIDITY_TOKENS from modules import storage, utils, SolanaAPI from modules.utils import async_safe_call, decode_instruction_data import os import logging from datetime import datetime on_transaction = None def init_app(tr_handler=None): global on_transaction on_transaction = tr_handler app = Flask(__name__, template_folder='../templates', static_folder='../static') executor = ThreadPoolExecutor(max_workers=10) # Adjust the number of workers as needed login_manager = LoginManager(app) login_manager.login_view = 'login' # oauth = OAuth(app) # google = oauth.remote_app( # 'google', # consumer_key='YOUR_GOOGLE_CLIENT_ID', # consumer_secret='YOUR_GOOGLE_CLIENT_SECRET', # request_token_params={ # 'scope': 'email' # }, # base_url='https://www.googleapis.com/oauth2/v1/', # request_token_url=None, # access_token_method='POST', # access_token_url='https://accounts.google.com/o/oauth2/token', # authorize_url='https://accounts.google.com/o/oauth2/auth', # ) login_manager = LoginManager() login_manager.init_app(app) logger = logging.getLogger(__name__) # API @app.route('/tr//', methods=['GET', 'POST']) async def transaction_notified(wallet, tx_signature): try: logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") request_data = request.get_json() if request.is_json else None if not request_data: # Process the transaction # tr = await get_swap_transaction_details(tx_signature) tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, []) else: tr = request_data # ToDo - probably optimize tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in']) tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out']) prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']]) tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in'] tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out'] notification = ( f"Got TXN notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n" ) logging.info(notification) await utils.telegram_utils.send_telegram_message(notification) # Store the notified transaction in the database original_transaction = storage.Transaction( wallet=wallet, transaction_type="SWAP", symbol_in=tr['symbol_in'], amount_in=tr['amount_in'], value_in_usd=tr['value_in_USD'], symbol_out=tr['symbol_out'], amount_out=tr['amount_out'], value_out_usd=tr['value_out_USD'], tx_signature=tx_signature ) await storage.store_transaction(original_transaction) # Attempt to execute the copytrade transaction try: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction follow_transaction = storage.Transaction( wallet=wallet, transaction_type="SWAP", symbol_in=tr['symbol_in'], amount_in=tr['amount_in'], value_in_usd=tr['value_in_USD'], symbol_out=tr['symbol_out'], amount_out=tr['amount_out'], value_out_usd=tr['value_out_USD'], tx_signature=tx_signature ) await storage.store_transaction(follow_transaction) except Exception as e: # Store the failed copytrade transaction failed_transaction = storage.Transaction( wallet=wallet, transaction_type="SWAP_FAIL", symbol_in=tr['symbol_in'], amount_in=tr['amount_in'], value_in_usd=tr['value_in_USD'], symbol_out=tr['symbol_out'], amount_out=tr['amount_out'], value_out_usd=tr['value_out_USD'], tx_signature=tx_signature ) await storage.store_transaction(failed_transaction) logging.error(f"Copytrade transaction failed: {e}") # ToDo - probably optimize await SolanaAPI.SAPI.save_token_info() return jsonify(tr), 200 except Exception as e: logging.error(f"Error processing transaction: {e}") return jsonify({"error": "Failed to process transaction"}), 500 @app.route('/wh', methods=['POST']) async def webhook(): try: current_time = datetime.now().strftime("%Y%m%d-%H%M%S") logger.info("Processing webhook") request_data = request.get_json() if request.is_json else None if not request_data: return jsonify({"error": "No data in request"}), 400 if "description" in request_data[0] and request_data[0]["description"]: logger.info(request_data[0]["description"]) else: logger.info(f"Webhook data: {request_data}") # save dump to /cache/last-webhook-{datetime}.json with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f: json.dump(request_data, f) if "meta" in request_data[0]: meta = request_data[0]["meta"] # Parse inner instructions for inner_ix in meta.get("innerInstructions", []): for instruction in inner_ix.get("instructions", []): decoded = decode_instruction_data(instruction["data"]) logger.info(f"Instruction data decoded: {decoded}") # Example of pattern matching for specific instruction types if decoded["instruction_type"] == 1: # Example: swap instruction # Parse parameters based on program type # Different DEXes will have different parameter layouts pass # await process_wh(request_data) # don't wait for the process to finish executor.submit(asyncio.run, process_wh(request_data)) return jsonify({"status": "OK"}), 200 except Exception as e: logging.error(f"Error processing webhook: {e}") return jsonify({"error": "Failed to process webhook"}), 500 # Flask route to retry processing the last log async def process_wh(data): global on_transaction try: if data[0].get('type') == "SWAP": swap_event = data[0]['events'].get('swap') if not swap_event: logging.warning("No swap event found in data") return # Extract token input details from the first token input token_inputs = swap_event.get('tokenInputs', []) token_outputs = swap_event.get('tokenOutputs', []) tr = {} wallet = data[0]['feePayer'] # Using feePayer as the wallet address tx_signature = data[0]['signature'] usdcMint = LIQUIDITY_TOKENS[0] solMint = LIQUIDITY_TOKENS[1] try: # Determine transaction type if token_inputs and token_outputs and LIQUIDITY_TOKENS[0] in [token_inputs[0]["mint"], token_outputs[0]["mint"]]: if token_inputs[0]["mint"] == usdcMint: tr["type"] = "BUY" else: tr["type"] = "SELL" else: tr["type"] = "SWAP" if swap_event.get('nativeInput', None): tr["token_in"] = solMint tr["amount_in"] = int(swap_event.get('nativeInput')["amount"])/ 10**6 tr["type"] = "BUY" tr["token_in_decimals"] = 6 if swap_event.get('nativeOutput', None): tr["token_out"] = solMint tr["amount_out"] = int(swap_event.get('nativeOutput')["amount"]) / 10**6 tr["type"] = "SELL" tr["token_out_decimals"] = 6 if not token_inputs or len(token_inputs) == 0: logging.info("Assumed USDC as first token. BUY transaction detected") tr["token_in"] = usdcMint tr["type"] = "BUY" tr["amount_in"] = await calculate_price_amount(token_outputs[0]) else: token_in = token_inputs[0] tr["token_in"] = token_in["mint"] tr["token_in_decimals"] = get_decimals(token_in) tr["amount_in"] = calculate_amount(token_in) if not token_outputs or len(token_outputs) == 0: logging.info("Assumed USDC as second token. SELL transaction detected") tr["token_out"] = usdcMint tr["type"] = "SELL" tr["amount_out"] = await calculate_price_amount(token_inputs[0]) else: token_out = token_outputs[0] tr["token_out"] = token_out["mint"] tr["token_out_decimals"] = get_decimals(token_out) # Store transaction in database if tr["type"] in ["BUY", "SELL"]: is_buy = tr["type"] == "BUY" transaction = storage.Transaction( wallet=wallet, transaction_type=tr["type"], symbol_in=tr["token_in"], amount_in=tr["amount_in"] if is_buy else 0, value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, symbol_out=tr["token_out"], amount_out=tr["amount_out"] if not is_buy else 0, value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, tx_signature=tx_signature ) await storage.store_transaction(transaction) if swap_event.get('nativeInput'): # SOL token_in = swap_event.get('nativeInput', []) logger.info(f"Native input (SOL) detected ({token_in["amount"]})") if not tr["token_in"] or not tr["token_out"] or tr["amount_in"] == 0 or tr["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") tx_signature = data[0].get('signature') logs = data[0].get('logs', []) tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, logs) except Exception as e: logging.error(f"Error loading transaction token data: {str(e)}") # ToDo - probably optimize tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in']) tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out']) prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']]) tr["token_in_price"] = prices.get(tr['token_in'], 0) tr["token_out_price"] = prices.get(tr['token_out'], 0) tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in'] tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out'] notification = ( f"Got WH notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['amount_out']} {tr['symbol_out']} ${tr['value_out_USD']}\n" ) logging.info(notification) await utils.telegram_utils.send_telegram_message(notification) # Store the notified transaction in the database # storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) copyTransaction = storage.Transaction( wallet=wallet, transaction_type=tr["type"], symbol_in=tr["token_in"], amount_in=tr["amount_in"] if is_buy else 0, value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, symbol_out=tr["token_out"], amount_out=tr["amount_out"] if not is_buy else 0, value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, tx_signature=tx_signature ) try: await storage.store_transaction(copyTransaction) except: logging.error(traceback.format_exc()) # Attempt to execute the copytrade transaction try: # await SolanaAPI.SAPI.follow_move(tr) if on_transaction: await async_safe_call( on_transaction, tr) else: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) except Exception as e: # Store the failed copytrade transaction # await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) logging.error(f"Copytrade transaction failed: {e}") # ToDo - probably optimize await SolanaAPI.DEX.save_token_info() else: logger.info("wh transaction is not a swap. skipping...") except Exception as e: logging.error(f"Error processing transaction notification: {str(e)}") # Log the full traceback for debugging logging.error(traceback.format_exc()) def get_decimals(token_data): return token_data["rawTokenAmount"].get("decimals") or token_data["rawTokenAmount"].get("decimalFs", 0) def calculate_amount(token_data): decimals = get_decimals(token_data) token_amount = int(token_data["rawTokenAmount"]["tokenAmount"]) return float(token_amount / 10**decimals) async def calculate_price_amount(token_data, prices=None): if not prices: prices = await SolanaAPI.DEX.get_token_prices([token_data["mint"]]) decimals = get_decimals(token_data) token_amount = int(token_data["rawTokenAmount"]["tokenAmount"]) return prices[token_data["mint"]] * token_amount / 10**decimals @app.route('/replay_wh', methods=['POST']) async def replay_wh(): try: data = request.get_json() filename = data.get('filename') if not filename: return jsonify({"error": "Filename not provided"}), 400 file_path = os.path.join(SolanaAPI.root_path, 'logs', filename) if not os.path.exists(file_path): return jsonify({"error": "File not found"}), 404 with open(file_path, 'r') as f: log_data = json.load(f) await process_wh(log_data) return jsonify({"status": "Replay successful"}), 200 except Exception as e: logging.error(f"Error replaying webhook file: {e}") return jsonify({"error": "Failed to replay webhook file"}), 500 @app.route('/retry-last-log', methods=['GET']) async def retry_last_log(): wh = request.args.get('wh', 'false').lower() == 'true' latest_log_file = get_latest_log_file(wh) if not latest_log_file: return jsonify({"error": "No log files found"}), 404 try: utils.log.info(f"Processing latest log file: {latest_log_file}") with open(latest_log_file, 'r') as f: log = json.load(f) if wh: result = await process_wh(log) else: result = await SolanaAPI.process_log(log) return jsonify({ "file": latest_log_file, "status": "Log dump processed successfully", "result": result }), 200 except Exception as e: utils.log.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 # # # # # AUTHENTICATION # # # # @app.route('/login/google/authorized') def authorized(): # resp = google.authorized_response() # if resp is None or resp.get('access_token') is None: # return 'Access denied: reason={} error={}'.format( # request.args['error_reason'], # request.args['error_description'] # ) # session['google_token'] = (resp['access_token'], '') # user_info = google.get('userinfo') # user = storage.get_or_create_user(user_info.data['email'], user_info.data['id']) # login_user(user) return redirect(url_for('index')) class User(UserMixin): def __init__(self, id, username, email): self.id = id self.username = username self.email = email @login_manager.user_loader def load_user(user_id): user_data = storage.get_user_by_id(user_id) if user_data: return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) return None @app.route('/') def index(): return render_template('index.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = storage.authenticate_user(username, password) if user: login_user(User(id=user['id'], username=user['username'], email=user['email'])) return redirect(url_for('dashboard')) else: return render_template('login.html', error='Invalid credentials') elif request.args.get('google'): # Uncomment the following line if Google OAuth is set up # return google.authorize(callback=url_for('authorized', _external=True)) return render_template('login.html', error='Google OAuth not configured') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('index')) @app.route('/dashboard') @login_required def dashboard(): return render_template('dashboard.html') @app.route('/generate_api_key', methods=['POST']) @login_required def generate_api_key(): api_key = secrets.token_urlsafe(32) storage.store_api_key(current_user.id, api_key) return jsonify({'api_key': api_key}) @app.route('/wallet//transactions', methods=['GET']) @login_required def get_transactions(wallet_id): transactions = storage.get_transactions(wallet_id) return jsonify(transactions) @app.route('/wallet//holdings', methods=['GET']) @login_required def get_holdings(wallet_id): holdings = storage.get_holdings(wallet_id) return jsonify(holdings) return app # Function to find the latest log file def get_latest_log_file(wh:bool): log_dir = os.path.join(SolanaAPI.root_path, 'logs') try: # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] # filter files mask log_20241005_004103_143116.json if wh: files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('wh_')] else: files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))) return os.path.join(log_dir, latest_file) except Exception as e: utils.log.error(f"Error fetching latest log file: {e}") return None export = init_app