gogo2/crypto/sol/modules/webui.py
2024-10-21 17:56:38 +03:00

179 lines
6.3 KiB
Python

from flask import Flask, jsonify, request, render_template, redirect, url_for
# from flask_oauthlib.client import OAuth
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
import secrets
from modules import storage
import os
def init_app():
app = Flask(__name__, template_folder='../templates', static_folder='../static')
app.config['SECRET_KEY'] = 'your-secret-key'
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# oauth = OAuth(app)
# google = oauth.remote_app(
# 'google',
# consumer_key='YOUR_GOOGLE_CLIENT_ID',
# consumer_secret='YOUR_GOOGLE_CLIENT_SECRET',
# request_token_params={
# 'scope': 'email'
# },
# base_url='https://www.googleapis.com/oauth2/v1/',
# request_token_url=None,
# access_token_method='POST',
# access_token_url='https://accounts.google.com/o/oauth2/token',
# authorize_url='https://accounts.google.com/o/oauth2/auth',
# )
login_manager = LoginManager()
login_manager.init_app(app)
@app.route('/login/google/authorized')
def authorized():
# resp = google.authorized_response()
# if resp is None or resp.get('access_token') is None:
# return 'Access denied: reason={} error={}'.format(
# request.args['error_reason'],
# request.args['error_description']
# )
# session['google_token'] = (resp['access_token'], '')
# user_info = google.get('userinfo')
# user = storage.get_or_create_user(user_info.data['email'], user_info.data['id'])
# login_user(user)
return redirect(url_for('index'))
class User(UserMixin):
def __init__(self, id, username, email):
self.id = id
self.username = username
self.email = email
@login_manager.user_loader
def load_user(user_id):
user_data = storage.get_user_by_id(user_id)
if user_data:
return User(id=user_data['id'], username=user_data['username'], email=user_data['email'])
return None
@app.route('/')
def index():
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = storage.authenticate_user(username, password)
if user:
login_user(User(id=user['id'], username=user['username'], email=user['email']))
return redirect(url_for('dashboard'))
else:
return render_template('login.html', error='Invalid credentials')
elif request.args.get('google'):
return google.authorize(callback=url_for('authorized', _external=True))
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html')
@app.route('/generate_api_key', methods=['POST'])
@login_required
def generate_api_key():
api_key = secrets.token_urlsafe(32)
storage.store_api_key(current_user.id, api_key)
return jsonify({'api_key': api_key})
@app.route('/wallet/<int:wallet_id>/transactions', methods=['GET'])
@login_required
def get_transactions(wallet_id):
transactions = storage.get_transactions(wallet_id)
return jsonify(transactions)
@app.route('/wallet/<int:wallet_id>/holdings', methods=['GET'])
@login_required
def get_holdings(wallet_id):
holdings = storage.get_holdings(wallet_id)
return jsonify(holdings)
# Flask route to retry processing the last log
@app.route('/retry', methods=['GET'])
@app.route('/retry-last-log', methods=['GET'])
async def retry_last_log():
latest_log_file = get_latest_log_file()
if not latest_log_file:
return jsonify({"error": "No log files found"}), 404
try:
logger.info(f"Processing latest log file: {latest_log_file}")
with open(latest_log_file, 'r') as f:
log = json.load(f)
result = await process_log(log)
return jsonify({
"file": latest_log_file,
"status": "Log dump processed successfully",
"result": result
}), 200
except Exception as e:
logging.error(f"Error processing log dump: {e}")
return jsonify({"error": "Failed to process log"}), 500
#const webhookPath = `/tr/${followedWallet.toBase58()}/${logs.signature}`;
@app.route('/tr/<wallet>/<tx_signature>', methods=['GET', 'POST'])
async def transaction_notified(wallet, tx_signature):
try:
logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}")
# Process the transaction
# tr = await get_swap_transaction_details(tx_signature)
tr = await get_transaction_details_info(tx_signature, [])
# ToDo - probably optimize
await get_token_metadata_symbol(tr['token_in'])
await get_token_metadata_symbol(tr['token_out'])
await follow_move(tr)
await save_token_info()
return jsonify(tr), 200
except Exception as e:
logging.error(f"Error processing transaction: {e}")
return jsonify({"error": "Failed to process transaction"}), 500
return app
# Function to find the latest log file
def get_latest_log_file():
log_dir = './logs'
try:
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
# filter files mask log_20241005_004103_143116.json
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x)))
return os.path.join(log_dir, latest_file)
except Exception as e:
logging.error(f"Error fetching latest log file: {e}")
return None