flask webui looking good

This commit is contained in:
Dobromir Popov
2024-10-13 01:37:10 +03:00
parent 49384accf6
commit a655c5bd88
8 changed files with 257 additions and 165 deletions

View File

@ -1,8 +1,12 @@
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import aiosqlite
import json
from datetime import datetime
DATABASE_FILE = "app_data.db"
DATABASE_FILE = "./app_data.db"
async def init_db():
async with aiosqlite.connect(DATABASE_FILE) as db:
@ -79,60 +83,7 @@ async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amo
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (wallet_id, datetime.now().isoformat(), transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, json.dumps(details or {})))
await db.commit()
# async def get_new_transactions(wallet_address, rpc_url):
# async with AsyncClient(rpc_url) as client:
# last_tx = await get_last_stored_transaction(wallet_address)
# if last_tx:
# last_signature, last_timestamp = last_tx
# else:
# # If no transactions are stored, we'll fetch all transactions
# last_signature = None
# last_timestamp = None
# new_transactions = []
# # Get the transaction history for the wallet
# tx_history = await client.get_signatures_for_address(wallet_address, before=last_signature)
# for tx in tx_history.value:
# # Check if the transaction is newer than the last stored one
# if not last_timestamp or tx.block_time > datetime.fromisoformat(last_timestamp).timestamp():
# # Fetch the full transaction details
# tx_details = await client.get_transaction(tx.signature, commitment=Confirmed)
# new_transactions.append(tx_details)
# return new_transactions
# async def process_new_transactions(wallet_id, wallet_address, rpc_url):
# new_transactions = await get_new_transactions(wallet_address, rpc_url)
# for tx in new_transactions:
# # Process the transaction and extract relevant information
# # This is a placeholder - you'll need to implement the actual logic based on your requirements
# transaction_type = "swap" # Determine the type based on the transaction data
# sell_currency = "SOL" # Extract from transaction data
# sell_amount = 1.0 # Extract from transaction data
# sell_value = 100.0 # Extract from transaction data
# buy_currency = "USDC" # Extract from transaction data
# buy_amount = 100.0 # Extract from transaction data
# buy_value = 100.0 # Extract from transaction data
# solana_signature = tx.transaction.signatures[0]
# # Store the transaction in the database
# await store_transaction(
# wallet_id, transaction_type, sell_currency, sell_amount, sell_value,
# buy_currency, buy_amount, buy_value, solana_signature
# )
# # Update holdings
# await update_holdings(wallet_id, sell_currency, -sell_amount)
# await update_holdings(wallet_id, buy_currency, buy_amount)
# # After processing all new transactions, close completed transactions
# await close_completed_transactions(wallet_id)
async def update_holdings(wallet_id, currency, amount_change):
async with aiosqlite.connect(DATABASE_FILE) as db:
@ -258,6 +209,114 @@ async def get_profit_loss(wallet_id, currency, start_date=None, end_date=None):
result = await cursor.fetchone()
return result[0] if result else 0
# # # # # # USERS
# For this example, we'll use a simple dictionary to store users
users = {
"db": {"id": 1, "username": "db", "email": "user1@example.com", "password": "db"},
"popov": {"id": 2, "username": "popov", "email": "user2@example.com", "password": "popov"}
}
def get_or_create_user(email, google_id):
user = next((u for u in users.values() if u['email'] == email), None)
if not user:
user_id = max(u['id'] for u in users.values()) + 1
username = email.split('@')[0] # Use the part before @ as username
user = {
'id': user_id,
'username': username,
'email': email,
'google_id': google_id
}
users[username] = user
return user
def authenticate_user(username, password):
"""
Authenticate a user based on username and password.
Returns user data if authentication is successful, None otherwise.
"""
user = users.get(username)
if user and user['password'] == password:
return {"id": user['id'], "username": user['username'], "email": user['email']}
return None
def get_user_by_id(user_id):
"""
Retrieve a user by their ID.
"""
for user in users.values():
if user['id'] == int(user_id):
return {"id": user['id'], "username": user['username'], "email": user['email']}
return None
def store_api_key(user_id, api_key):
"""
Store the generated API key for a user.
"""
# In a real application, you would store this in a database
# For this example, we'll just print it
print(f"Storing API key {api_key} for user {user_id}")
# async def get_new_transactions(wallet_address, rpc_url):
# async with AsyncClient(rpc_url) as client:
# last_tx = await get_last_stored_transaction(wallet_address)
# if last_tx:
# last_signature, last_timestamp = last_tx
# else:
# # If no transactions are stored, we'll fetch all transactions
# last_signature = None
# last_timestamp = None
# new_transactions = []
# # Get the transaction history for the wallet
# tx_history = await client.get_signatures_for_address(wallet_address, before=last_signature)
# for tx in tx_history.value:
# # Check if the transaction is newer than the last stored one
# if not last_timestamp or tx.block_time > datetime.fromisoformat(last_timestamp).timestamp():
# # Fetch the full transaction details
# tx_details = await client.get_transaction(tx.signature, commitment=Confirmed)
# new_transactions.append(tx_details)
# return new_transactions
# async def process_new_transactions(wallet_id, wallet_address, rpc_url):
# new_transactions = await get_new_transactions(wallet_address, rpc_url)
# for tx in new_transactions:
# # Process the transaction and extract relevant information
# # This is a placeholder - you'll need to implement the actual logic based on your requirements
# transaction_type = "swap" # Determine the type based on the transaction data
# sell_currency = "SOL" # Extract from transaction data
# sell_amount = 1.0 # Extract from transaction data
# sell_value = 100.0 # Extract from transaction data
# buy_currency = "USDC" # Extract from transaction data
# buy_amount = 100.0 # Extract from transaction data
# buy_value = 100.0 # Extract from transaction data
# solana_signature = tx.transaction.signatures[0]
# # Store the transaction in the database
# await store_transaction(
# wallet_id, transaction_type, sell_currency, sell_amount, sell_value,
# buy_currency, buy_amount, buy_value, solana_signature
# )
# # Update holdings
# await update_holdings(wallet_id, sell_currency, -sell_amount)
# await update_holdings(wallet_id, buy_currency, buy_amount)
# # After processing all new transactions, close completed transactions
# await close_completed_transactions(wallet_id)
# Example usage
if __name__ == "__main__":
import asyncio

View File

@ -1,75 +1,112 @@
from flask import Flask, jsonify, request, render_template, redirect, url_for
# from flask_oauthlib.client import OAuth
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
import secrets
from modules import storage
import os
app = Flask(__name__, template_folder='../templates', static_folder='../static')
app.config['SECRET_KEY'] = 'your-secret-key'
login_manager = LoginManager(app)
login_manager.login_view = 'login'
def init_app():
app = Flask(__name__, template_folder='../templates', static_folder='../static')
app.config['SECRET_KEY'] = 'your-secret-key'
login_manager = LoginManager(app)
login_manager.login_view = 'login'
class User(UserMixin):
def __init__(self, id, username, email):
self.id = id
self.username = username
self.email = email
# oauth = OAuth(app)
# google = oauth.remote_app(
# 'google',
# consumer_key='YOUR_GOOGLE_CLIENT_ID',
# consumer_secret='YOUR_GOOGLE_CLIENT_SECRET',
# request_token_params={
# 'scope': 'email'
# },
# base_url='https://www.googleapis.com/oauth2/v1/',
# request_token_url=None,
# access_token_method='POST',
# access_token_url='https://accounts.google.com/o/oauth2/token',
# authorize_url='https://accounts.google.com/o/oauth2/auth',
# )
@login_manager.user_loader
def load_user(user_id):
user_data = storage.get_user_by_id(user_id)
if user_data:
return User(id=user_data['id'], username=user_data['username'], email=user_data['email'])
return None
@app.route('/')
def index():
return render_template('index.html')
login_manager = LoginManager()
login_manager.init_app(app)
@app.route('/login/google/authorized')
def authorized():
# resp = google.authorized_response()
# if resp is None or resp.get('access_token') is None:
# return 'Access denied: reason={} error={}'.format(
# request.args['error_reason'],
# request.args['error_description']
# )
# session['google_token'] = (resp['access_token'], '')
# user_info = google.get('userinfo')
# user = storage.get_or_create_user(user_info.data['email'], user_info.data['id'])
# login_user(user)
return redirect(url_for('index'))
class User(UserMixin):
def __init__(self, id, username, email):
self.id = id
self.username = username
self.email = email
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = storage.authenticate_user(username, password)
if user:
login_user(User(id=user['id'], username=user['username'], email=user['email']))
return redirect(url_for('dashboard'))
else:
return render_template('login.html', error='Invalid credentials')
return render_template('login.html')
@login_manager.user_loader
def load_user(user_id):
user_data = storage.get_user_by_id(user_id)
if user_data:
return User(id=user_data['id'], username=user_data['username'], email=user_data['email'])
return None
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/')
def index():
return render_template('index.html')
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = storage.authenticate_user(username, password)
if user:
login_user(User(id=user['id'], username=user['username'], email=user['email']))
return redirect(url_for('dashboard'))
else:
return render_template('login.html', error='Invalid credentials')
elif request.args.get('google'):
return google.authorize(callback=url_for('authorized', _external=True))
return render_template('login.html')
@app.route('/generate_api_key', methods=['POST'])
@login_required
def generate_api_key():
api_key = secrets.token_urlsafe(32)
storage.store_api_key(current_user.id, api_key)
return jsonify({'api_key': api_key})
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/wallet/<int:wallet_id>/transactions', methods=['GET'])
@login_required
def get_transactions(wallet_id):
transactions = storage.get_transactions(wallet_id)
return jsonify(transactions)
@app.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html')
@app.route('/wallet/<int:wallet_id>/holdings', methods=['GET'])
@login_required
def get_holdings(wallet_id):
holdings = storage.get_holdings(wallet_id)
return jsonify(holdings)
@app.route('/generate_api_key', methods=['POST'])
@login_required
def generate_api_key():
api_key = secrets.token_urlsafe(32)
storage.store_api_key(current_user.id, api_key)
return jsonify({'api_key': api_key})
@app.route('/wallet/<int:wallet_id>/transactions', methods=['GET'])
@login_required
def get_transactions(wallet_id):
transactions = storage.get_transactions(wallet_id)
return jsonify(transactions)
@app.route('/wallet/<int:wallet_id>/holdings', methods=['GET'])
@login_required
def get_holdings(wallet_id):
holdings = storage.get_holdings(wallet_id)
return jsonify(holdings)
# Implement other routes for reports, price alerts, following accounts, etc.
def init_app():
return app