from flask import Flask, jsonify, request, render_template, redirect, url_for # from flask_oauthlib.client import OAuth from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user import secrets import json from modules import storage, utils, SolanaAPI from modules.utils import async_safe_call import os import logging from datetime import datetime on_transaction = None def init_app(tr_handler=None): global on_transaction on_transaction = tr_handler app = Flask(__name__, template_folder='../templates', static_folder='../static') login_manager = LoginManager(app) login_manager.login_view = 'login' # oauth = OAuth(app) # google = oauth.remote_app( # 'google', # consumer_key='YOUR_GOOGLE_CLIENT_ID', # consumer_secret='YOUR_GOOGLE_CLIENT_SECRET', # request_token_params={ # 'scope': 'email' # }, # base_url='https://www.googleapis.com/oauth2/v1/', # request_token_url=None, # access_token_method='POST', # access_token_url='https://accounts.google.com/o/oauth2/token', # authorize_url='https://accounts.google.com/o/oauth2/auth', # ) login_manager = LoginManager() login_manager.init_app(app) logger = logging.getLogger(__name__) # API @app.route('/tr//', methods=['GET', 'POST']) async def transaction_notified(wallet, tx_signature): try: logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") request_data = request.get_json() if request.is_json else None if not request_data: # Process the transaction # tr = await get_swap_transaction_details(tx_signature) tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, []) else: tr = request_data # ToDo - probably optimize tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in']) tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out']) prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']]) tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in'] tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out'] notification = ( f"Got TXN notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n" ) logging.info(notification) await utils.telegram_utils.send_telegram_message(notification) # Store the notified transaction in the database await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) # Attempt to execute the copytrade transaction try: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) except Exception as e: # Store the failed copytrade transaction await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) logging.error(f"Copytrade transaction failed: {e}") # ToDo - probably optimize await SolanaAPI.SAPI.save_token_info() return jsonify(tr), 200 except Exception as e: logging.error(f"Error processing transaction: {e}") return jsonify({"error": "Failed to process transaction"}), 500 @app.route('/wh', methods=['POST']) async def webhook(): try: current_time = datetime.now().strftime("%Y%m%d-%H%M%S") logger.info("Processing webhook") request_data = request.get_json() if request.is_json else None if not request_data: return jsonify({"error": "No data in request"}), 400 logger.info(f"Webhook data: {request_data}") # save dump to /cache/last-webhook-{datetime}.json with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f: json.dump(request_data, f) process_wh(request_data) return jsonify({"status": "Webhook processed"}), 200 except Exception as e: logging.error(f"Error processing webhook: {e}") return jsonify({"error": "Failed to process webhook"}), 500 # Flask route to retry processing the last log async def process_wh( data): global on_transaction try: if data[0].get('type') == "SWAP": swap_event = data[0]['events'].get('swap') if not swap_event: logging.warning("No swap event found in data") return # Extract token input details from the first token input token_inputs = swap_event.get('tokenInputs', []) token_outputs = swap_event.get('tokenOutputs', []) if not token_inputs or not token_outputs: logging.warning("Missing token inputs or outputs") return tr = { 'token_in': token_inputs[0]['mint'], 'token_out': token_outputs[0]['mint'], 'amount_in': float(token_inputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_inputs[0]['rawTokenAmount']['decimals'], 'amount_out': float(token_outputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_outputs[0]['rawTokenAmount']['decimals'], } if not tr["token_in"] or not tr["token_out"] or tr["amount_in"] == 0 or tr["amount_out"] == 0: logging.warning("Incomplete swap details found in logs. Getting details from transaction") tx_signature = data[0].get('signature') logs = data[0].get('logs', []) tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, logs) wallet = data[0]['feePayer'] # Using feePayer as the wallet address tx_signature = data[0]['signature'] # ToDo - probably optimize tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in']) tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out']) prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']]) tr["token_in_price"] = prices.get(tr['token_in'], 0) tr["token_out_price"] = prices.get(tr['token_out'], 0) tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in'] tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out'] notification = ( f"Got WH notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n" ) logging.info(notification) await utils.telegram_utils.send_telegram_message(notification) # Store the notified transaction in the database storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) # Attempt to execute the copytrade transaction try: # await SolanaAPI.SAPI.follow_move(tr) if on_transaction: await async_safe_call( on_transaction, tr) else: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) except Exception as e: # Store the failed copytrade transaction storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) logging.error(f"Copytrade transaction failed: {e}") # ToDo - probably optimize await SolanaAPI.DEX.save_token_info() except Exception as e: logging.error(f"Error processing transaction notification: {str(e)}") # Log the full traceback for debugging import traceback logging.error(traceback.format_exc()) @app.route('/retry', methods=['GET']) @app.route('/retry-last-log', methods=['GET']) async def retry_last_log(): wh = request.args.get('wh', 'false').lower() == 'true' latest_log_file = get_latest_log_file(wh) if not latest_log_file: return jsonify({"error": "No log files found"}), 404 try: utils.log.info(f"Processing latest log file: {latest_log_file}") with open(latest_log_file, 'r') as f: log = json.load(f) if wh: result = await process_wh(log) else: result = await SolanaAPI.process_log(log) return jsonify({ "file": latest_log_file, "status": "Log dump processed successfully", "result": result }), 200 except Exception as e: utils.log.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 # # # # # AUTHENTICATION # # # # @app.route('/login/google/authorized') def authorized(): # resp = google.authorized_response() # if resp is None or resp.get('access_token') is None: # return 'Access denied: reason={} error={}'.format( # request.args['error_reason'], # request.args['error_description'] # ) # session['google_token'] = (resp['access_token'], '') # user_info = google.get('userinfo') # user = storage.get_or_create_user(user_info.data['email'], user_info.data['id']) # login_user(user) return redirect(url_for('index')) class User(UserMixin): def __init__(self, id, username, email): self.id = id self.username = username self.email = email @login_manager.user_loader def load_user(user_id): user_data = storage.get_user_by_id(user_id) if user_data: return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) return None @app.route('/') def index(): return render_template('index.html') @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = storage.authenticate_user(username, password) if user: login_user(User(id=user['id'], username=user['username'], email=user['email'])) return redirect(url_for('dashboard')) else: return render_template('login.html', error='Invalid credentials') elif request.args.get('google'): # Uncomment the following line if Google OAuth is set up # return google.authorize(callback=url_for('authorized', _external=True)) return render_template('login.html', error='Google OAuth not configured') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('index')) @app.route('/dashboard') @login_required def dashboard(): return render_template('dashboard.html') @app.route('/generate_api_key', methods=['POST']) @login_required def generate_api_key(): api_key = secrets.token_urlsafe(32) storage.store_api_key(current_user.id, api_key) return jsonify({'api_key': api_key}) @app.route('/wallet//transactions', methods=['GET']) @login_required def get_transactions(wallet_id): transactions = storage.get_transactions(wallet_id) return jsonify(transactions) @app.route('/wallet//holdings', methods=['GET']) @login_required def get_holdings(wallet_id): holdings = storage.get_holdings(wallet_id) return jsonify(holdings) return app # Function to find the latest log file def get_latest_log_file(wh:bool): log_dir = os.path.join(SolanaAPI.root_path, 'logs') try: # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] # filter files mask log_20241005_004103_143116.json if wh: files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('wh_')] else: files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))) return os.path.join(log_dir, latest_file) except Exception as e: utils.log.error(f"Error fetching latest log file: {e}") return None