From 5db7fcf6c7d84aeedbc56092cae8321072f1e1f9 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 12 Oct 2024 22:41:55 +0300 Subject: [PATCH 01/13] envs --- crypto/sol/.env | 16 ++++++++++++---- crypto/sol/.env.example | 15 --------------- 2 files changed, 12 insertions(+), 19 deletions(-) delete mode 100644 crypto/sol/.env.example diff --git a/crypto/sol/.env b/crypto/sol/.env index ab435f5..4fbd146 100644 --- a/crypto/sol/.env +++ b/crypto/sol/.env @@ -3,14 +3,22 @@ SOLANA_WS_URL="wss://api.mainnet-beta.solana.com" SOLANA_WS_URL2="wss://mainnet.rpcpool.com" SOLANA_HTTP_URL="https://api.mainnet-beta.solana.com" -DEVELOPER_CHAT_ID="777826553" +# prod, @kevin_ai_robot: +BOT_NAME="Solower" +DEVELOPER_CHAT_ID="777826553" # https://api.telegram.org/bot{token}/getUpdates TELEGRAM_BOT_TOKEN="6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw" +# dev, @artitherobot: +BOT_NAME="DEV" +DEVELOPER_CHAT_ID="777826553" +TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0" + DISPLAY_CURRENCY=USD # Niki's to Sync: [PROD] - FOLLOWED_WALLET="7keSmTZozjmuX66gd9GBSJYEHnMqsyutWpvuuKtXZKDH" YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +FOLLOWED_WALLET="7keSmTZozjmuX66gd9GBSJYEHnMqsyutWpvuuKtXZKDH" +YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" # Sync to main [DEV] -# FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" -# YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" \ No newline at end of file +FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" \ No newline at end of file diff --git a/crypto/sol/.env.example b/crypto/sol/.env.example deleted file mode 100644 index e949837..0000000 --- a/crypto/sol/.env.example +++ /dev/null @@ -1,15 +0,0 @@ - -SOLANA_WS_URL="wss://api.mainnet-beta.solana.com" -SOLANA_WS_URL2="wss://mainnet.rpcpool.com" -SOLANA_HTTP_URL="https://api.mainnet-beta.solana.com" -DEVELOPER_CHAT_ID="777826553" -# Niki's -# FOLLOWED_WALLET="9U7D916zuQ8qcL9kQZqkcroWhHGho5vD8VNekvztrutN" -# My test Brave sync wallet -FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" - -TELEGRAM_BOT_TOKEN="6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw" -DISPLAY_CURRENCY=USD - -YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" -PK={} \ No newline at end of file From 3a89856b54e33ee646d6dfdd746961dc188c9cec Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sat, 12 Oct 2024 22:44:21 +0300 Subject: [PATCH 02/13] bot name in telegram messages --- crypto/sol/app.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 6ec719f..2c87c19 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -54,6 +54,7 @@ TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') +BOT_NAME = os.getenv("BOT_NAME") logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) @@ -130,7 +131,7 @@ except Exception as e: # # # # # # # # # # TELEGRAM # # # # # # # # # # async def send_telegram_message(message): try: - await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) + await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") # logging.info(f"Telegram message dummy sent: {message}") except Exception as e: From cfe04171c7db3ae7d0e4acbbbd4458f61015f924 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 13 Oct 2024 00:08:05 +0300 Subject: [PATCH 03/13] init storage module --- crypto/sol/modules/storage.py | 214 ++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 crypto/sol/modules/storage.py diff --git a/crypto/sol/modules/storage.py b/crypto/sol/modules/storage.py new file mode 100644 index 0000000..2003934 --- /dev/null +++ b/crypto/sol/modules/storage.py @@ -0,0 +1,214 @@ +import aiosqlite +import json +from datetime import datetime + +DATABASE_FILE = "app_data.db" + +async def init_db(): + async with aiosqlite.connect(DATABASE_FILE) as db: + await db.executescript(""" + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL, + email TEXT UNIQUE NOT NULL, + api_key TEXT UNIQUE, + plan TEXT DEFAULT 'free' + ); + + CREATE TABLE IF NOT EXISTS wallets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER, + address TEXT NOT NULL, + name TEXT, + FOREIGN KEY (user_id) REFERENCES users(id) + ); + + CREATE TABLE IF NOT EXISTS transactions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + wallet_id INTEGER, + timestamp TEXT, + type TEXT, + sell_currency TEXT, + sell_amount REAL, + sell_value REAL, + buy_currency TEXT, + buy_amount REAL, + buy_value REAL, + closed BOOLEAN DEFAULT 0, + details TEXT, + FOREIGN KEY (wallet_id) REFERENCES wallets(id) + ); + + CREATE TABLE IF NOT EXISTS holdings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + wallet_id INTEGER, + currency TEXT, + amount REAL, + last_updated TEXT, + FOREIGN KEY (wallet_id) REFERENCES wallets(id) + ); + + CREATE TABLE IF NOT EXISTS price_alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER, + currency TEXT, + target_price REAL, + alert_type TEXT, + FOREIGN KEY (user_id) REFERENCES users(id) + ); + + CREATE TABLE IF NOT EXISTS followed_accounts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER, + address TEXT, + followed_address TEXT, + name TEXT, + FOREIGN KEY (address) REFERENCES wallets(address), + FOREIGN KEY (followed_address) REFERENCES wallets(address), + FOREIGN KEY (user_id) REFERENCES users(id) + ); + """) + await db.commit() + +async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, details=None): + async with aiosqlite.connect(DATABASE_FILE) as db: + await db.execute(""" + INSERT INTO transactions (wallet_id, timestamp, type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, details) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (wallet_id, datetime.now().isoformat(), transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, json.dumps(details or {}))) + await db.commit() + +async def update_holdings(wallet_id, currency, amount_change): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute("SELECT amount FROM holdings WHERE wallet_id = ? AND currency = ?", (wallet_id, currency)) + result = await cursor.fetchone() + if result: + new_amount = result[0] + amount_change + await db.execute("UPDATE holdings SET amount = ?, last_updated = ? WHERE wallet_id = ? AND currency = ?", + (new_amount, datetime.now().isoformat(), wallet_id, currency)) + else: + await db.execute("INSERT INTO holdings (wallet_id, currency, amount, last_updated) VALUES (?, ?, ?, ?)", + (wallet_id, currency, amount_change, datetime.now().isoformat())) + await db.commit() + +async def get_wallet_holdings(wallet_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute("SELECT currency, amount FROM holdings WHERE wallet_id = ?", (wallet_id,)) + return await cursor.fetchall() + +async def get_transaction_history(wallet_id, start_date=None, end_date=None, include_closed=False): + async with aiosqlite.connect(DATABASE_FILE) as db: + query = "SELECT * FROM transactions WHERE wallet_id = ?" + params = [wallet_id] + if not include_closed: + query += " AND closed = 0" + if start_date: + query += " AND timestamp >= ?" + params.append(start_date) + if end_date: + query += " AND timestamp <= ?" + params.append(end_date) + query += " ORDER BY timestamp DESC" + cursor = await db.execute(query, params) + return await cursor.fetchall() + +# New utility functions + +async def close_transaction(transaction_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + await db.execute("UPDATE transactions SET closed = 1 WHERE id = ?", (transaction_id,)) + await db.commit() + +async def get_open_transactions(wallet_id, currency): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT * FROM transactions + WHERE wallet_id = ? AND buy_currency = ? AND closed = 0 + ORDER BY timestamp ASC + """, (wallet_id, currency)) + return await cursor.fetchall() + +async def calculate_current_holdings(wallet_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT + buy_currency AS currency, + SUM(buy_amount) - COALESCE( + (SELECT SUM(sell_amount) + FROM transactions t2 + WHERE t2.wallet_id = t1.wallet_id + AND t2.sell_currency = t1.buy_currency + AND t2.closed = 0), + 0 + ) AS amount + FROM transactions t1 + WHERE wallet_id = ? AND closed = 0 + GROUP BY buy_currency + HAVING amount > 0 + """, (wallet_id,)) + return await cursor.fetchall() + +STABLECOINS = ['USDC', 'USDT', 'SOL'] + +async def is_transaction_closed(wallet_id, transaction_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT t1.buy_currency, t1.buy_amount, + (SELECT SUM(sell_amount) + FROM transactions t2 + WHERE t2.wallet_id = t1.wallet_id + AND t2.sell_currency = t1.buy_currency + AND t2.timestamp > t1.timestamp) AS sold_amount + FROM transactions t1 + WHERE t1.id = ? AND t1.wallet_id = ? + """, (transaction_id, wallet_id)) + result = await cursor.fetchone() + + if result: + buy_currency, buy_amount, sold_amount = result + return sold_amount is not None and sold_amount >= buy_amount + return False + +async def close_completed_transactions(wallet_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT id FROM transactions + WHERE wallet_id = ? AND closed = 0 AND buy_currency NOT IN (?) + """, (wallet_id, ','.join(STABLECOINS))) + transactions = await cursor.fetchall() + + for (transaction_id,) in transactions: + if await is_transaction_closed(wallet_id, transaction_id): + await close_transaction(transaction_id) + +async def get_profit_loss(wallet_id, currency, start_date=None, end_date=None): + async with aiosqlite.connect(DATABASE_FILE) as db: + query = """ + SELECT + SUM(CASE WHEN sell_currency = ? THEN sell_value ELSE -buy_value END) as profit_loss + FROM transactions + WHERE wallet_id = ? AND (sell_currency = ? OR buy_currency = ?) + """ + params = [currency, wallet_id, currency, currency] + + if start_date: + query += " AND timestamp >= ?" + params.append(start_date) + if end_date: + query += " AND timestamp <= ?" + params.append(end_date) + + cursor = await db.execute(query, params) + result = await cursor.fetchone() + return result[0] if result else 0 + +# Example usage +if __name__ == "__main__": + import asyncio + + async def main(): + await init_db() + # Add more test functions here + + asyncio.run(main()) \ No newline at end of file From d9ffbcfa13f320815b2d70393054e4e4c4717b5b Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 13 Oct 2024 00:12:40 +0300 Subject: [PATCH 04/13] store sol tr signature --- crypto/sol/modules/storage.py | 63 ++++++++++++++++++++++++++++++++--- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/crypto/sol/modules/storage.py b/crypto/sol/modules/storage.py index 2003934..ae4aaaf 100644 --- a/crypto/sol/modules/storage.py +++ b/crypto/sol/modules/storage.py @@ -37,6 +37,7 @@ async def init_db(): buy_value REAL, closed BOOLEAN DEFAULT 0, details TEXT, + solana_signature TEXT UNIQUE, FOREIGN KEY (wallet_id) REFERENCES wallets(id) ); @@ -71,14 +72,68 @@ async def init_db(): """) await db.commit() -async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, details=None): +async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details=None): async with aiosqlite.connect(DATABASE_FILE) as db: await db.execute(""" - INSERT INTO transactions (wallet_id, timestamp, type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, details) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, (wallet_id, datetime.now().isoformat(), transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, json.dumps(details or {}))) + INSERT INTO transactions (wallet_id, timestamp, type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (wallet_id, datetime.now().isoformat(), transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, json.dumps(details or {}))) await db.commit() +# async def get_new_transactions(wallet_address, rpc_url): +# async with AsyncClient(rpc_url) as client: +# last_tx = await get_last_stored_transaction(wallet_address) + +# if last_tx: +# last_signature, last_timestamp = last_tx +# else: +# # If no transactions are stored, we'll fetch all transactions +# last_signature = None +# last_timestamp = None + +# new_transactions = [] + +# # Get the transaction history for the wallet +# tx_history = await client.get_signatures_for_address(wallet_address, before=last_signature) + +# for tx in tx_history.value: +# # Check if the transaction is newer than the last stored one +# if not last_timestamp or tx.block_time > datetime.fromisoformat(last_timestamp).timestamp(): +# # Fetch the full transaction details +# tx_details = await client.get_transaction(tx.signature, commitment=Confirmed) +# new_transactions.append(tx_details) + +# return new_transactions + +# async def process_new_transactions(wallet_id, wallet_address, rpc_url): +# new_transactions = await get_new_transactions(wallet_address, rpc_url) + +# for tx in new_transactions: +# # Process the transaction and extract relevant information +# # This is a placeholder - you'll need to implement the actual logic based on your requirements +# transaction_type = "swap" # Determine the type based on the transaction data +# sell_currency = "SOL" # Extract from transaction data +# sell_amount = 1.0 # Extract from transaction data +# sell_value = 100.0 # Extract from transaction data +# buy_currency = "USDC" # Extract from transaction data +# buy_amount = 100.0 # Extract from transaction data +# buy_value = 100.0 # Extract from transaction data +# solana_signature = tx.transaction.signatures[0] + +# # Store the transaction in the database +# await store_transaction( +# wallet_id, transaction_type, sell_currency, sell_amount, sell_value, +# buy_currency, buy_amount, buy_value, solana_signature +# ) + +# # Update holdings +# await update_holdings(wallet_id, sell_currency, -sell_amount) +# await update_holdings(wallet_id, buy_currency, buy_amount) + +# # After processing all new transactions, close completed transactions +# await close_completed_transactions(wallet_id) + + async def update_holdings(wallet_id, currency, amount_change): async with aiosqlite.connect(DATABASE_FILE) as db: cursor = await db.execute("SELECT amount FROM holdings WHERE wallet_id = ? AND currency = ?", (wallet_id, currency)) From b9410f29868d0a066d655d2b8b62ff4dc23af2bb Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 13 Oct 2024 00:27:02 +0300 Subject: [PATCH 05/13] init flask and PWA - file structure --- crypto/sol/app.py | 4 ++ crypto/sol/modules/__init__.py | 0 crypto/sol/modules/webui.py | 58 +++++++++++++++++++++++++++++ crypto/sol/static/css/styles.css | 0 crypto/sol/static/images/logo.png | 0 crypto/sol/static/{ => js}/app.js | 0 crypto/sol/static/manifest.json | 0 crypto/sol/static/service-worker.js | 0 8 files changed, 62 insertions(+) create mode 100644 crypto/sol/modules/__init__.py create mode 100644 crypto/sol/modules/webui.py create mode 100644 crypto/sol/static/css/styles.css create mode 100644 crypto/sol/static/images/logo.png rename crypto/sol/static/{ => js}/app.js (100%) create mode 100644 crypto/sol/static/manifest.json create mode 100644 crypto/sol/static/service-worker.js diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 2c87c19..131d3b2 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -139,6 +139,10 @@ async def send_telegram_message(message): +# # # # # # # # # # DATABASE # # # # # # # # # # + + + # # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: diff --git a/crypto/sol/modules/__init__.py b/crypto/sol/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py new file mode 100644 index 0000000..09c4e28 --- /dev/null +++ b/crypto/sol/modules/webui.py @@ -0,0 +1,58 @@ +from flask import Flask, jsonify, request +from flask_login import LoginManager, UserMixin, login_user, login_required, current_user +import secrets +from modules import storage # Import your storage module + +app = Flask(__name__) +app.config['SECRET_KEY'] = 'your-secret-key' +login_manager = LoginManager(app) + +class User(UserMixin): + def __init__(self, id, username, email): + self.id = id + self.username = username + self.email = email + +@login_manager.user_loader +def load_user(user_id): + user_data = storage.get_user_by_id(user_id) + if user_data: + return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) + return None + +@app.route('/login', methods=['POST']) +def login(): + data = request.json + username = data.get('username') + password = data.get('password') + + user = storage.authenticate_user(username, password) + if user: + login_user(User(id=user['id'], username=user['username'], email=user['email'])) + return jsonify({'message': 'Login successful'}), 200 + else: + return jsonify({'message': 'Invalid credentials'}), 401 + +@app.route('/generate_api_key', methods=['POST']) +@login_required +def generate_api_key(): + api_key = secrets.token_urlsafe(32) + storage.store_api_key(current_user.id, api_key) + return jsonify({'api_key': api_key}) + +@app.route('/wallet//transactions', methods=['GET']) +@login_required +def get_transactions(wallet_id): + transactions = storage.get_transactions(wallet_id) + return jsonify(transactions) + +@app.route('/wallet//holdings', methods=['GET']) +@login_required +def get_holdings(wallet_id): + holdings = storage.get_holdings(wallet_id) + return jsonify(holdings) + +# Implement other routes for reports, price alerts, following accounts, etc. + +def init_app(): + return app \ No newline at end of file diff --git a/crypto/sol/static/css/styles.css b/crypto/sol/static/css/styles.css new file mode 100644 index 0000000..e69de29 diff --git a/crypto/sol/static/images/logo.png b/crypto/sol/static/images/logo.png new file mode 100644 index 0000000..e69de29 diff --git a/crypto/sol/static/app.js b/crypto/sol/static/js/app.js similarity index 100% rename from crypto/sol/static/app.js rename to crypto/sol/static/js/app.js diff --git a/crypto/sol/static/manifest.json b/crypto/sol/static/manifest.json new file mode 100644 index 0000000..e69de29 diff --git a/crypto/sol/static/service-worker.js b/crypto/sol/static/service-worker.js new file mode 100644 index 0000000..e69de29 From 49384accf6bfa3f09e1a86622aa3103194ed3e66 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 13 Oct 2024 00:39:46 +0300 Subject: [PATCH 06/13] flask web app (wip) --- crypto/sol/app.py | 6 ++++ crypto/sol/modules/webui.py | 47 ++++++++++++++++++++--------- crypto/sol/requirements.txt | 1 + crypto/sol/static/css/styles.css | 46 ++++++++++++++++++++++++++++ crypto/sol/static/js/app.js | 18 +++++++++++ crypto/sol/static/manifest.json | 20 ++++++++++++ crypto/sol/static/service-worker.js | 8 +++++ crypto/sol/templates/base.html | 36 ++++++++++++++++++++++ crypto/sol/templates/dashboard.html | 23 ++++++++++++++ crypto/sol/templates/index.html | 27 ++++------------- crypto/sol/templates/login.html | 17 +++++++++++ crypto/sol/templates/swap.html | 21 +++++++++++++ 12 files changed, 234 insertions(+), 36 deletions(-) create mode 100644 crypto/sol/templates/base.html create mode 100644 crypto/sol/templates/dashboard.html create mode 100644 crypto/sol/templates/login.html create mode 100644 crypto/sol/templates/swap.html diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 131d3b2..d8f4c78 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1463,7 +1463,13 @@ async def main(): logging.info("Restarting wallet_watch_loop") await send_telegram_message("Restarting wallet_watch_loop") + +from modules.webui import init_app + async def run_flask(): + # loop = asyncio.get_running_loop() + # await loop.run_in_executor(None, lambda: app.run(debug=False, port=3001, use_reloader=False)) + app = init_app() loop = asyncio.get_running_loop() await loop.run_in_executor(None, lambda: app.run(debug=False, port=3001, use_reloader=False)) diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index 09c4e28..4cae030 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -1,11 +1,12 @@ -from flask import Flask, jsonify, request -from flask_login import LoginManager, UserMixin, login_user, login_required, current_user +from flask import Flask, jsonify, request, render_template, redirect, url_for +from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user import secrets -from modules import storage # Import your storage module +from modules import storage -app = Flask(__name__) +app = Flask(__name__, template_folder='../templates', static_folder='../static') app.config['SECRET_KEY'] = 'your-secret-key' login_manager = LoginManager(app) +login_manager.login_view = 'login' class User(UserMixin): def __init__(self, id, username, email): @@ -20,18 +21,34 @@ def load_user(user_id): return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) return None -@app.route('/login', methods=['POST']) +@app.route('/') +def index(): + return render_template('index.html') + +@app.route('/login', methods=['GET', 'POST']) def login(): - data = request.json - username = data.get('username') - password = data.get('password') - - user = storage.authenticate_user(username, password) - if user: - login_user(User(id=user['id'], username=user['username'], email=user['email'])) - return jsonify({'message': 'Login successful'}), 200 - else: - return jsonify({'message': 'Invalid credentials'}), 401 + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + + user = storage.authenticate_user(username, password) + if user: + login_user(User(id=user['id'], username=user['username'], email=user['email'])) + return redirect(url_for('dashboard')) + else: + return render_template('login.html', error='Invalid credentials') + return render_template('login.html') + +@app.route('/logout') +@login_required +def logout(): + logout_user() + return redirect(url_for('index')) + +@app.route('/dashboard') +@login_required +def dashboard(): + return render_template('dashboard.html') @app.route('/generate_api_key', methods=['POST']) @login_required diff --git a/crypto/sol/requirements.txt b/crypto/sol/requirements.txt index afc1b11..9a75c66 100644 --- a/crypto/sol/requirements.txt +++ b/crypto/sol/requirements.txt @@ -2,6 +2,7 @@ aiohttp==3.10.9 base58==2.1.1 dexscreener==1.1 Flask==3.0.3 +flask-login jupiter_python_sdk==0.0.2.0 python-dotenv==1.0.1 python-telegram-bot==21.6 diff --git a/crypto/sol/static/css/styles.css b/crypto/sol/static/css/styles.css index e69de29..d2db183 100644 --- a/crypto/sol/static/css/styles.css +++ b/crypto/sol/static/css/styles.css @@ -0,0 +1,46 @@ +/* Add your custom styles here */ +body { + font-family: Arial, sans-serif; + line-height: 1.6; + margin: 0; + padding: 0; +} + +header { + background-color: #4A90E2; + color: white; + padding: 1rem; +} + +nav ul { + list-style-type: none; + padding: 0; +} + +nav ul li { + display: inline; + margin-right: 1rem; +} + +nav ul li a { + color: white; + text-decoration: none; +} + +main { + padding: 2rem; +} + +footer { + background-color: #333; + color: white; + text-align: center; + padding: 1rem; + position: fixed; + bottom: 0; + width: 100%; +} + +@media (max-width: 768px) { + /* Add responsive styles for mobile devices */ +} \ No newline at end of file diff --git a/crypto/sol/static/js/app.js b/crypto/sol/static/js/app.js index 32143e6..4f3195f 100644 --- a/crypto/sol/static/js/app.js +++ b/crypto/sol/static/js/app.js @@ -1,3 +1,4 @@ + document.getElementById('connectWallet').addEventListener('click', async () => { try { const { solana } is window; @@ -26,3 +27,20 @@ document.getElementById('swapToken').addEventListener('click', () => { .then(response => response.json()) .then(data => alert(data.message)); }); + + +// Add your custom JavaScript here +document.addEventListener('DOMContentLoaded', () => { + const generateApiKeyButton = document.getElementById('generate-api-key'); + const apiKeyDisplay = document.getElementById('api-key-display'); + + if (generateApiKeyButton) { + generateApiKeyButton.addEventListener('click', async () => { + const response = await fetch('/generate_api_key', { method: 'POST' }); + const data = await response.json(); + apiKeyDisplay.textContent = `Your API Key: ${data.api_key}`; + }); + } + + // Add more JavaScript for fetching and displaying wallet data, transactions, and holdings +}); \ No newline at end of file diff --git a/crypto/sol/static/manifest.json b/crypto/sol/static/manifest.json index e69de29..4ea8f8c 100644 --- a/crypto/sol/static/manifest.json +++ b/crypto/sol/static/manifest.json @@ -0,0 +1,20 @@ +{ + "name": "Crypto Portfolio Tracker", + "short_name": "CryptoTracker", + "start_url": "/", + "display": "standalone", + "background_color": "#ffffff", + "theme_color": "#4A90E2", + "icons": [ + { + "src": "/static/images/logo-192x192.png", + "sizes": "192x192", + "type": "image/png" + }, + { + "src": "/static/images/logo-512x512.png", + "sizes": "512x512", + "type": "image/png" + } + ] + } \ No newline at end of file diff --git a/crypto/sol/static/service-worker.js b/crypto/sol/static/service-worker.js index e69de29..0559efc 100644 --- a/crypto/sol/static/service-worker.js +++ b/crypto/sol/static/service-worker.js @@ -0,0 +1,8 @@ +// Add service worker code for offline functionality and caching +self.addEventListener('install', (event) => { + // Perform install steps + }); + + self.addEventListener('fetch', (event) => { + // Handle fetch events + }); \ No newline at end of file diff --git a/crypto/sol/templates/base.html b/crypto/sol/templates/base.html new file mode 100644 index 0000000..cdd2ba2 --- /dev/null +++ b/crypto/sol/templates/base.html @@ -0,0 +1,36 @@ + + + + + + {% block title %}Crypto Portfolio Tracker{% endblock %} + + + + + +
+ +
+ +
+ {% block content %}{% endblock %} +
+ +
+

© 2023 Crypto Portfolio Tracker

+
+ + + + \ No newline at end of file diff --git a/crypto/sol/templates/dashboard.html b/crypto/sol/templates/dashboard.html new file mode 100644 index 0000000..7733d1f --- /dev/null +++ b/crypto/sol/templates/dashboard.html @@ -0,0 +1,23 @@ +{% extends "base.html" %} + +{% block content %} +

Dashboard

+

Welcome, {{ current_user.username }}!

+ +

Your Wallets

+
+ +

Recent Transactions

+
+ +

Holdings

+
+ + +

+ + +{% endblock %} \ No newline at end of file diff --git a/crypto/sol/templates/index.html b/crypto/sol/templates/index.html index 0de89fe..5ebd64a 100644 --- a/crypto/sol/templates/index.html +++ b/crypto/sol/templates/index.html @@ -1,21 +1,6 @@ - - - - - - Token Swapper - - -

Token Swapper

-
- -
-
- - - -
- - - - +{% extends "base.html" %} + +{% block content %} +

Welcome to Crypto Portfolio Tracker

+

Track your cryptocurrency investments with ease.

+{% endblock %} \ No newline at end of file diff --git a/crypto/sol/templates/login.html b/crypto/sol/templates/login.html new file mode 100644 index 0000000..7bf3068 --- /dev/null +++ b/crypto/sol/templates/login.html @@ -0,0 +1,17 @@ +{% extends "base.html" %} + +{% block content %} +

Login

+
+ + + + + + + +
+{% if error %} +

{{ error }}

+{% endif %} +{% endblock %} \ No newline at end of file diff --git a/crypto/sol/templates/swap.html b/crypto/sol/templates/swap.html new file mode 100644 index 0000000..0de89fe --- /dev/null +++ b/crypto/sol/templates/swap.html @@ -0,0 +1,21 @@ + + + + + + Token Swapper + + +

Token Swapper

+
+ +
+
+ + + +
+ + + + From a655c5bd88a23923d072b046484e10e5a4cbf161 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 13 Oct 2024 01:37:10 +0300 Subject: [PATCH 07/13] flask webui looking good --- app_data.db | Bin 0 -> 49152 bytes crypto/sol/app.py | 2 + crypto/sol/modules/storage.py | 167 +++++++++++++++++++++----------- crypto/sol/modules/webui.py | 151 ++++++++++++++++++----------- crypto/sol/requirements.in | 12 --- crypto/sol/requirements.txt | 2 + crypto/sol/static/js/app.js | 67 +++++++------ crypto/sol/templates/login.html | 21 ++-- 8 files changed, 257 insertions(+), 165 deletions(-) create mode 100644 app_data.db delete mode 100644 crypto/sol/requirements.in diff --git a/app_data.db b/app_data.db new file mode 100644 index 0000000000000000000000000000000000000000..1d388bb4120fc177e4e9fb87ae8cc67f6472afa7 GIT binary patch literal 49152 zcmeI)zi!(`9KdlhvPJn{RiP+2qye}Xv5+8cnxW`Wpo&WeLKHbsWT0M3oW!$Dg!&Vb zs_UVPV-)Dphv?EbX{SCyA0cZ{6d6jMBuBI&Ic|&$#FrM$f5+q9=YB`(ELcyU)VxSq z`@NteBI};^v8Ld@jRaL@yfz@BVOtg>$6r3?T+)=TJaxkW$_Q~qs`)9 z#ovlYYk#l)xm;fTweZsTJ>JcP00Iag@V^CKe7|Cto16O2&mz(C<$llidq>iBM7!M^ zbfa);$LdzYt~70{S-D@c+AYyRW)!w z@Y>Q5z6_#pa(yM$tdmDRT#L2`K_I*B;~5PSiQqs+jw)N+4wc$8xT=>E9rxd?d{5Lt!aHe z-Q!#t3D2K(7Y=*A=n5zF4!R;51jAlw?bfSLcJ1lB-Y3YuEac6H>$>K3UHN19+>eJQ zju=Eewe3tslXG`+Yw6^ZykTCyuD|$6Mdv7rM++krlcSwi7LJ#C#98&7rd8jqz2&a= zt`D(G-0vPQ=FMBT^gU%V;tb{UL413b=Qi^xqjLv0lCs^7&n%zVx?!58e(I?_)zeD- zx|TX$?YdIoGo4Y~&4KKTFg)r7u5&2DLv;g~lUR1*z#k?W2K1b?qVGA+h>9yTo!_FTMD$7QdJf zKmY**5I_I{1Q0*~0R#|0AaeqXxzdu`V*H=EhNVIXAbp|JiO|YKH&< z2q1s}0tg_000IagfIuR^_@6jH009ILKmY**5I_I{1Q0+V`vQ#rv){(l5CH@bKmY** O5I_I{1Q0*~f&Ty*Fv{lu literal 0 HcmV?d00001 diff --git a/crypto/sol/app.py b/crypto/sol/app.py index d8f4c78..8848e46 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -1465,6 +1465,7 @@ async def main(): from modules.webui import init_app +from modules.storage import init_db async def run_flask(): # loop = asyncio.get_running_loop() @@ -1475,6 +1476,7 @@ async def run_flask(): async def run_all(): await asyncio.gather( + init_db(), main(), run_flask() ) diff --git a/crypto/sol/modules/storage.py b/crypto/sol/modules/storage.py index ae4aaaf..26ea7e2 100644 --- a/crypto/sol/modules/storage.py +++ b/crypto/sol/modules/storage.py @@ -1,8 +1,12 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + import aiosqlite import json from datetime import datetime -DATABASE_FILE = "app_data.db" +DATABASE_FILE = "./app_data.db" async def init_db(): async with aiosqlite.connect(DATABASE_FILE) as db: @@ -79,60 +83,7 @@ async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amo VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (wallet_id, datetime.now().isoformat(), transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, json.dumps(details or {}))) await db.commit() - -# async def get_new_transactions(wallet_address, rpc_url): -# async with AsyncClient(rpc_url) as client: -# last_tx = await get_last_stored_transaction(wallet_address) -# if last_tx: -# last_signature, last_timestamp = last_tx -# else: -# # If no transactions are stored, we'll fetch all transactions -# last_signature = None -# last_timestamp = None - -# new_transactions = [] - -# # Get the transaction history for the wallet -# tx_history = await client.get_signatures_for_address(wallet_address, before=last_signature) - -# for tx in tx_history.value: -# # Check if the transaction is newer than the last stored one -# if not last_timestamp or tx.block_time > datetime.fromisoformat(last_timestamp).timestamp(): -# # Fetch the full transaction details -# tx_details = await client.get_transaction(tx.signature, commitment=Confirmed) -# new_transactions.append(tx_details) - -# return new_transactions - -# async def process_new_transactions(wallet_id, wallet_address, rpc_url): -# new_transactions = await get_new_transactions(wallet_address, rpc_url) - -# for tx in new_transactions: -# # Process the transaction and extract relevant information -# # This is a placeholder - you'll need to implement the actual logic based on your requirements -# transaction_type = "swap" # Determine the type based on the transaction data -# sell_currency = "SOL" # Extract from transaction data -# sell_amount = 1.0 # Extract from transaction data -# sell_value = 100.0 # Extract from transaction data -# buy_currency = "USDC" # Extract from transaction data -# buy_amount = 100.0 # Extract from transaction data -# buy_value = 100.0 # Extract from transaction data -# solana_signature = tx.transaction.signatures[0] - -# # Store the transaction in the database -# await store_transaction( -# wallet_id, transaction_type, sell_currency, sell_amount, sell_value, -# buy_currency, buy_amount, buy_value, solana_signature -# ) - -# # Update holdings -# await update_holdings(wallet_id, sell_currency, -sell_amount) -# await update_holdings(wallet_id, buy_currency, buy_amount) - -# # After processing all new transactions, close completed transactions -# await close_completed_transactions(wallet_id) - async def update_holdings(wallet_id, currency, amount_change): async with aiosqlite.connect(DATABASE_FILE) as db: @@ -258,6 +209,114 @@ async def get_profit_loss(wallet_id, currency, start_date=None, end_date=None): result = await cursor.fetchone() return result[0] if result else 0 +# # # # # # USERS + +# For this example, we'll use a simple dictionary to store users +users = { + "db": {"id": 1, "username": "db", "email": "user1@example.com", "password": "db"}, + "popov": {"id": 2, "username": "popov", "email": "user2@example.com", "password": "popov"} +} + +def get_or_create_user(email, google_id): + user = next((u for u in users.values() if u['email'] == email), None) + if not user: + user_id = max(u['id'] for u in users.values()) + 1 + username = email.split('@')[0] # Use the part before @ as username + user = { + 'id': user_id, + 'username': username, + 'email': email, + 'google_id': google_id + } + users[username] = user + return user + +def authenticate_user(username, password): + """ + Authenticate a user based on username and password. + Returns user data if authentication is successful, None otherwise. + """ + user = users.get(username) + if user and user['password'] == password: + return {"id": user['id'], "username": user['username'], "email": user['email']} + return None + +def get_user_by_id(user_id): + """ + Retrieve a user by their ID. + """ + for user in users.values(): + if user['id'] == int(user_id): + return {"id": user['id'], "username": user['username'], "email": user['email']} + return None + +def store_api_key(user_id, api_key): + """ + Store the generated API key for a user. + """ + # In a real application, you would store this in a database + # For this example, we'll just print it + print(f"Storing API key {api_key} for user {user_id}") + + + + + +# async def get_new_transactions(wallet_address, rpc_url): +# async with AsyncClient(rpc_url) as client: +# last_tx = await get_last_stored_transaction(wallet_address) + +# if last_tx: +# last_signature, last_timestamp = last_tx +# else: +# # If no transactions are stored, we'll fetch all transactions +# last_signature = None +# last_timestamp = None + +# new_transactions = [] + +# # Get the transaction history for the wallet +# tx_history = await client.get_signatures_for_address(wallet_address, before=last_signature) + +# for tx in tx_history.value: +# # Check if the transaction is newer than the last stored one +# if not last_timestamp or tx.block_time > datetime.fromisoformat(last_timestamp).timestamp(): +# # Fetch the full transaction details +# tx_details = await client.get_transaction(tx.signature, commitment=Confirmed) +# new_transactions.append(tx_details) + +# return new_transactions + +# async def process_new_transactions(wallet_id, wallet_address, rpc_url): +# new_transactions = await get_new_transactions(wallet_address, rpc_url) + +# for tx in new_transactions: +# # Process the transaction and extract relevant information +# # This is a placeholder - you'll need to implement the actual logic based on your requirements +# transaction_type = "swap" # Determine the type based on the transaction data +# sell_currency = "SOL" # Extract from transaction data +# sell_amount = 1.0 # Extract from transaction data +# sell_value = 100.0 # Extract from transaction data +# buy_currency = "USDC" # Extract from transaction data +# buy_amount = 100.0 # Extract from transaction data +# buy_value = 100.0 # Extract from transaction data +# solana_signature = tx.transaction.signatures[0] + +# # Store the transaction in the database +# await store_transaction( +# wallet_id, transaction_type, sell_currency, sell_amount, sell_value, +# buy_currency, buy_amount, buy_value, solana_signature +# ) + +# # Update holdings +# await update_holdings(wallet_id, sell_currency, -sell_amount) +# await update_holdings(wallet_id, buy_currency, buy_amount) + +# # After processing all new transactions, close completed transactions +# await close_completed_transactions(wallet_id) + + + # Example usage if __name__ == "__main__": import asyncio diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index 4cae030..4dee698 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -1,75 +1,112 @@ from flask import Flask, jsonify, request, render_template, redirect, url_for +# from flask_oauthlib.client import OAuth from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user import secrets from modules import storage +import os -app = Flask(__name__, template_folder='../templates', static_folder='../static') -app.config['SECRET_KEY'] = 'your-secret-key' -login_manager = LoginManager(app) -login_manager.login_view = 'login' +def init_app(): + app = Flask(__name__, template_folder='../templates', static_folder='../static') + app.config['SECRET_KEY'] = 'your-secret-key' + login_manager = LoginManager(app) + login_manager.login_view = 'login' -class User(UserMixin): - def __init__(self, id, username, email): - self.id = id - self.username = username - self.email = email + # oauth = OAuth(app) + # google = oauth.remote_app( + # 'google', + # consumer_key='YOUR_GOOGLE_CLIENT_ID', + # consumer_secret='YOUR_GOOGLE_CLIENT_SECRET', + # request_token_params={ + # 'scope': 'email' + # }, + # base_url='https://www.googleapis.com/oauth2/v1/', + # request_token_url=None, + # access_token_method='POST', + # access_token_url='https://accounts.google.com/o/oauth2/token', + # authorize_url='https://accounts.google.com/o/oauth2/auth', + # ) -@login_manager.user_loader -def load_user(user_id): - user_data = storage.get_user_by_id(user_id) - if user_data: - return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) - return None -@app.route('/') -def index(): - return render_template('index.html') + login_manager = LoginManager() + login_manager.init_app(app) + + @app.route('/login/google/authorized') + def authorized(): + # resp = google.authorized_response() + # if resp is None or resp.get('access_token') is None: + # return 'Access denied: reason={} error={}'.format( + # request.args['error_reason'], + # request.args['error_description'] + # ) + # session['google_token'] = (resp['access_token'], '') + # user_info = google.get('userinfo') + # user = storage.get_or_create_user(user_info.data['email'], user_info.data['id']) + # login_user(user) + return redirect(url_for('index')) + + + class User(UserMixin): + def __init__(self, id, username, email): + self.id = id + self.username = username + self.email = email -@app.route('/login', methods=['GET', 'POST']) -def login(): - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') - - user = storage.authenticate_user(username, password) - if user: - login_user(User(id=user['id'], username=user['username'], email=user['email'])) - return redirect(url_for('dashboard')) - else: - return render_template('login.html', error='Invalid credentials') - return render_template('login.html') + @login_manager.user_loader + def load_user(user_id): + user_data = storage.get_user_by_id(user_id) + if user_data: + return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) + return None -@app.route('/logout') -@login_required -def logout(): - logout_user() - return redirect(url_for('index')) + @app.route('/') + def index(): + return render_template('index.html') -@app.route('/dashboard') -@login_required -def dashboard(): - return render_template('dashboard.html') + @app.route('/login', methods=['GET', 'POST']) + def login(): + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + user = storage.authenticate_user(username, password) + if user: + login_user(User(id=user['id'], username=user['username'], email=user['email'])) + return redirect(url_for('dashboard')) + else: + return render_template('login.html', error='Invalid credentials') + elif request.args.get('google'): + return google.authorize(callback=url_for('authorized', _external=True)) + return render_template('login.html') -@app.route('/generate_api_key', methods=['POST']) -@login_required -def generate_api_key(): - api_key = secrets.token_urlsafe(32) - storage.store_api_key(current_user.id, api_key) - return jsonify({'api_key': api_key}) + @app.route('/logout') + @login_required + def logout(): + logout_user() + return redirect(url_for('index')) -@app.route('/wallet//transactions', methods=['GET']) -@login_required -def get_transactions(wallet_id): - transactions = storage.get_transactions(wallet_id) - return jsonify(transactions) + @app.route('/dashboard') + @login_required + def dashboard(): + return render_template('dashboard.html') -@app.route('/wallet//holdings', methods=['GET']) -@login_required -def get_holdings(wallet_id): - holdings = storage.get_holdings(wallet_id) - return jsonify(holdings) + @app.route('/generate_api_key', methods=['POST']) + @login_required + def generate_api_key(): + api_key = secrets.token_urlsafe(32) + storage.store_api_key(current_user.id, api_key) + return jsonify({'api_key': api_key}) + + @app.route('/wallet//transactions', methods=['GET']) + @login_required + def get_transactions(wallet_id): + transactions = storage.get_transactions(wallet_id) + return jsonify(transactions) + + @app.route('/wallet//holdings', methods=['GET']) + @login_required + def get_holdings(wallet_id): + holdings = storage.get_holdings(wallet_id) + return jsonify(holdings) # Implement other routes for reports, price alerts, following accounts, etc. -def init_app(): return app \ No newline at end of file diff --git a/crypto/sol/requirements.in b/crypto/sol/requirements.in deleted file mode 100644 index 284d70d..0000000 --- a/crypto/sol/requirements.in +++ /dev/null @@ -1,12 +0,0 @@ -aiohttp==3.10.9 -aiohttp==3.10.5 -base58==2.1.1 -dexscreener==1.1 -Flask==3.0.3 -jupiter_python_sdk==0.0.2.0 -python-dotenv==1.0.1 -python-telegram-bot==21.6 -Requests==2.32.3 -solana==0.34.3 -solders==0.21.0 -websockets==10.4 diff --git a/crypto/sol/requirements.txt b/crypto/sol/requirements.txt index 9a75c66..7d2e9ed 100644 --- a/crypto/sol/requirements.txt +++ b/crypto/sol/requirements.txt @@ -1,8 +1,10 @@ aiohttp==3.10.9 +aiosqlite base58==2.1.1 dexscreener==1.1 Flask==3.0.3 flask-login +flask-oauthlib jupiter_python_sdk==0.0.2.0 python-dotenv==1.0.1 python-telegram-bot==21.6 diff --git a/crypto/sol/static/js/app.js b/crypto/sol/static/js/app.js index 4f3195f..80885c5 100644 --- a/crypto/sol/static/js/app.js +++ b/crypto/sol/static/js/app.js @@ -1,39 +1,42 @@ - -document.getElementById('connectWallet').addEventListener('click', async () => { - try { - const { solana } is window; - if (solana && solana.isPhantom) { - const response = await solana.connect({ onlyIfTrusted: true }); - console.log('Connected with Public Key:', response.publicKey.toString()); - } else { - alert('Phantom wallet not found. Please install it.'); - } - } catch (error) { - console.error(error); - alert('Connection to Phantom Wallet failed'); - } -}); - -document.getElementById('swapToken').addEventListener('click', () => { - const tokenName = document.getElementById('tokenName').value; - const amount = document.getElementById('amount').value; - fetch('/swap', { - method: 'POST', - headers: { - 'Content-Type': 'application/json' - }, - body: JSON.stringify({token_name: tokenName, amount: amount}) - }) - .then(response => response.json()) - .then(data => alert(data.message)); -}); - - -// Add your custom JavaScript here document.addEventListener('DOMContentLoaded', () => { + const connectWalletButton = document.getElementById('connectWallet'); + const swapTokenButton = document.getElementById('swapToken'); const generateApiKeyButton = document.getElementById('generate-api-key'); const apiKeyDisplay = document.getElementById('api-key-display'); + if (connectWalletButton) { + connectWalletButton.addEventListener('click', async () => { + try { + const { solana } = window; + if (solana && solana.isPhantom) { + const response = await solana.connect({ onlyIfTrusted: true }); + console.log('Connected with Public Key:', response.publicKey.toString()); + } else { + alert('Phantom wallet not found. Please install it.'); + } + } catch (error) { + console.error(error); + alert('Connection to Phantom Wallet failed'); + } + }); + } + + if (swapTokenButton) { + swapTokenButton.addEventListener('click', () => { + const tokenName = document.getElementById('tokenName').value; + const amount = document.getElementById('amount').value; + fetch('/swap', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({token_name: tokenName, amount: amount}) + }) + .then(response => response.json()) + .then(data => alert(data.message)); + }); + } + if (generateApiKeyButton) { generateApiKeyButton.addEventListener('click', async () => { const response = await fetch('/generate_api_key', { method: 'POST' }); diff --git a/crypto/sol/templates/login.html b/crypto/sol/templates/login.html index 7bf3068..49d940c 100644 --- a/crypto/sol/templates/login.html +++ b/crypto/sol/templates/login.html @@ -2,16 +2,17 @@ {% block content %}

Login

-
- - - - - - - -
{% if error %} -

{{ error }}

+

{{ error }}

{% endif %} +
+ +

+ +

+ +
+
+Login with Google + {% endblock %} \ No newline at end of file From 2c2c4ee4df123599b17282243a51c759b7110015 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 13 Oct 2024 23:23:33 +0300 Subject: [PATCH 08/13] starting to add moduls, solana rpc module --- crypto/sol/app.py | 142 +++++++------------ crypto/sol/config.py | 59 ++++++++ crypto/sol/modules/SolanaAPI.py | 239 ++++++++++++++++++++++++++++++++ crypto/sol/modules/utils.py | 27 ++++ 4 files changed, 377 insertions(+), 90 deletions(-) create mode 100644 crypto/sol/config.py create mode 100644 crypto/sol/modules/SolanaAPI.py create mode 100644 crypto/sol/modules/utils.py diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 8848e46..737bc4b 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -24,8 +24,7 @@ from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient -from telegram import Bot -from telegram.constants import ParseMode + import datetime import logging from logging.handlers import RotatingFileHandler @@ -41,35 +40,52 @@ from typing import List, Dict, Any, Tuple import random +from modules.webui import init_app +from modules.storage import init_db, store_transaction + app = Flask(__name__) -# config = load_config() -load_dotenv() -load_dotenv('.env.secret') -# Configuration -DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") -FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") -YOUR_WALLET = os.getenv("YOUR_WALLET") -TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") -SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") -SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") -DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') -BOT_NAME = os.getenv("BOT_NAME") +from config import ( + FOLLOWED_WALLET, + YOUR_WALLET, + SOLANA_WS_URL, + SOLANA_HTTP_URL, + DISPLAY_CURRENCY, + logger, + error_logger +) -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) -#logging.basicConfig(level=logging.INFO) +from modules.utils import (send_telegram_message, get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) -# Set up error logger -log_dir = './logs' -log_file = os.path.join(log_dir, 'error.log') -os.makedirs(log_dir, exist_ok=True) -error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) -error_file_handler.setLevel(logging.ERROR) -error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) -error_logger = logging.getLogger('error_logger') -error_logger.setLevel(logging.ERROR) -error_logger.addHandler(error_file_handler) +from modules.SolanaAPI import SolanaAPI, solana_jsonrpc + +# # config = load_config() +# load_dotenv() +# load_dotenv('.env.secret') +# # Configuration +# DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") +# FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") +# YOUR_WALLET = os.getenv("YOUR_WALLET") +# TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") +# SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") +# SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") +# DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') +# BOT_NAME = os.getenv("BOT_NAME") + +# logger = logging.getLogger(__name__) +# logging.basicConfig(level=logging.DEBUG) +# #logging.basicConfig(level=logging.INFO) + +# # Set up error logger +# log_dir = './logs' +# log_file = os.path.join(log_dir, 'error.log') +# os.makedirs(log_dir, exist_ok=True) +# error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) +# error_file_handler.setLevel(logging.ERROR) +# error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) +# error_logger = logging.getLogger('error_logger') +# error_logger.setLevel(logging.ERROR) +# error_logger.addHandler(error_file_handler) # Function to find the latest log file @@ -114,12 +130,6 @@ async def retry_last_log(): # Create the bot with the custom connection pool bot = None -# Token addresses (initialize with some known tokens) -TOKEN_ADDRESSES = { - "SOL": "So11111111111111111111111111111111111111112", - "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", - "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", -} TOKENS_INFO = {} try: @@ -129,13 +139,13 @@ except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # -async def send_telegram_message(message): - try: - await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) - logging.info(f"Telegram message sent: {message}") - # logging.info(f"Telegram message dummy sent: {message}") - except Exception as e: - logging.error(f"Error sending Telegram message: {str(e)}") +# async def send_telegram_message(message): +# try: +# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) +# logging.info(f"Telegram message sent: {message}") +# # logging.info(f"Telegram message dummy sent: {message}") +# except Exception as e: +# logging.error(f"Error sending Telegram message: {str(e)}") @@ -437,6 +447,8 @@ async def get_token_metadata_symbol(mint_address): return None + + METADATA_STRUCT = CStruct( "update_authority" / String, "mint" / String, @@ -793,49 +805,6 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False): print("Error fetching transaction details:", e) -async def solana_jsonrpc(method, params = None, jsonParsed = True): - # target json example: - # data = { - # "jsonrpc": "2.0", - # "id": 1, - # "method": "getTransaction", - # "params": [ - # tx_signature, - # { - # "encoding": "jsonParsed", - # "maxSupportedTransactionVersion": 0 - # } - # ] - # } - # if param is not array, make it array - if not isinstance(params, list): - params = [params] - - data = { - "jsonrpc": "2.0", - "id": 1, - "method": method, - "params": params or [] - } - data["params"].append({"maxSupportedTransactionVersion": 0}) - if jsonParsed: - data["params"][1]["encoding"] = "jsonParsed" - - - try: - # url = 'https://solana.drpc.org' - response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - result = response.json() - if not 'result' in result or 'error' in result: - print("Error fetching data from Solana RPC:", result) - return None - return result['result'] - except Exception as e: - logging.error(f"Error fetching data from Solana RPC: {e}") - return None - - # # # # # # # # # # Functionality # # # # # # # # # # @@ -1429,12 +1398,7 @@ async def check_PK(): async def main(): global bot, PROCESSING_LOG - # Initialize Telegram Bot - # Create a custom connection pool - conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit - timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout - bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() @@ -1464,8 +1428,6 @@ async def main(): await send_telegram_message("Restarting wallet_watch_loop") -from modules.webui import init_app -from modules.storage import init_db async def run_flask(): # loop = asyncio.get_running_loop() diff --git a/crypto/sol/config.py b/crypto/sol/config.py new file mode 100644 index 0000000..e708df8 --- /dev/null +++ b/crypto/sol/config.py @@ -0,0 +1,59 @@ +# config.py + +import os +import logging +from dotenv import load_dotenv +from logging.handlers import RotatingFileHandler + +# Load environment variables +load_dotenv() +load_dotenv('.env.secret') + +# Configuration +DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") +FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") +YOUR_WALLET = os.getenv("YOUR_WALLET") +TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") +SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") +SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") +DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') +BOT_NAME = os.getenv("BOT_NAME") + +# Token addresses (initialize with some known tokens) +TOKEN_ADDRESSES = { + "SOL": "So11111111111111111111111111111111111111112", + "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", + "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", +} + +# Logging configuration +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Set up error logger +log_dir = './logs' +log_file = os.path.join(log_dir, 'error.log') +os.makedirs(log_dir, exist_ok=True) +error_file_handler = RotatingFileHandler( + log_file, + maxBytes=10*1024*1024, + backupCount=5 +) +error_file_handler.setLevel(logging.ERROR) +error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) +error_logger = logging.getLogger('error_logger') +error_logger.setLevel(logging.ERROR) +error_logger.addHandler(error_file_handler) + +# Function to get all configuration +def get_config(): + return { + "DEVELOPER_CHAT_ID": DEVELOPER_CHAT_ID, + "FOLLOWED_WALLET": FOLLOWED_WALLET, + "YOUR_WALLET": YOUR_WALLET, + "TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN, + "SOLANA_WS_URL": SOLANA_WS_URL, + "SOLANA_HTTP_URL": SOLANA_HTTP_URL, + "DISPLAY_CURRENCY": DISPLAY_CURRENCY, + "BOT_NAME": BOT_NAME, + } \ No newline at end of file diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py new file mode 100644 index 0000000..ee4793b --- /dev/null +++ b/crypto/sol/modules/SolanaAPI.py @@ -0,0 +1,239 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import asyncio +import json +import logging +import random +import websockets +from typing import Optional +import requests +import datetime + +logger = logging.getLogger(__name__) + +SOLANA_ENDPOINTS = ["your_endpoint_1", "your_endpoint_2"] # Add your endpoints here +SUBSCRIBE_INTERVAL = 300 # 5 minutes in seconds + +from config import ( +FOLLOWED_WALLET, SOLANA_HTTP_URL +) + + +class SolanaAPI: + def __init__(self): + self.websocket: Optional[websockets.WebSocketClientProtocol] = None + self.subscription_id: Optional[int] = None + self.message_queue: asyncio.Queue = asyncio.Queue() + + async def connect(self): + while True: + try: + current_url = random.choice(SOLANA_ENDPOINTS) + self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=20) + logger.info(f"Connected to Solana websocket: {current_url}") + return + except Exception as e: + logger.error(f"Failed to connect to {current_url}: {e}") + await asyncio.sleep(5) + + async def subscribe(self): + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + {"mentions": [FOLLOWED_WALLET]}, + {"commitment": "confirmed"} + ] + } + await self.websocket.send(json.dumps(request)) + response = await self.websocket.recv() + response_data = json.loads(response) + + if 'result' in response_data: + self.subscription_id = response_data['result'] + logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + else: + logger.warning(f"Unexpected response: {response_data}") + + async def unsubscribe(self): + if self.subscription_id: + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsUnsubscribe", + "params": [self.subscription_id] + } + await self.websocket.send(json.dumps(request)) + logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") + self.subscription_id = None + + async def receive_messages(self): + while True: + try: + message = await self.websocket.recv() + await self.message_queue.put(message) + except websockets.exceptions.ConnectionClosedError: + logger.error("WebSocket connection closed") + break + except Exception as e: + logger.error(f"Error receiving message: {e}") + break + + async def process_messages(self): + while True: + message = await self.message_queue.get() + try: + response_data = json.loads(message) + if 'params' in response_data: + log = response_data['params']['result'] + await process_log(log) + else: + logger.warning(f"Unexpected response: {response_data}") + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred while processing message: {e}") + finally: + self.message_queue.task_done() + + +async def solana_jsonrpc(method, params = None, jsonParsed = True): + # target json example: + # data = { + # "jsonrpc": "2.0", + # "id": 1, + # "method": "getTransaction", + # "params": [ + # tx_signature, + # { + # "encoding": "jsonParsed", + # "maxSupportedTransactionVersion": 0 + # } + # ] + # } + # if param is not array, make it array + if not isinstance(params, list): + params = [params] + + data = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params or [] + } + data["params"].append({"maxSupportedTransactionVersion": 0}) + if jsonParsed: + data["params"][1]["encoding"] = "jsonParsed" + + + try: + # url = 'https://solana.drpc.org' + response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) + response.raise_for_status() # Raises an error for bad responses + result = response.json() + if not 'result' in result or 'error' in result: + print("Error fetching data from Solana RPC:", result) + return None + return result['result'] + except Exception as e: + logging.error(f"Error fetching data from Solana RPC: {e}") + return None + + +async def process_log(log): + # Implement your log processing logic here + pass + +async def send_telegram_message(message): + # Implement your Telegram message sending logic here + pass + +async def list_initial_wallet_states(): + # Implement your initial wallet state listing logic here + pass + +async def wallet_watch_loop(): + solana_ws = SolanaAPI() + first_subscription = True + + while True: + try: + await solana_ws.connect() + await solana_ws.subscribe() + + if first_subscription: + asyncio.create_task(list_initial_wallet_states()) + first_subscription = False + + await send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + + receive_task = asyncio.create_task(solana_ws.receive_messages()) + process_task = asyncio.create_task(solana_ws.process_messages()) + + try: + await asyncio.gather(receive_task, process_task) + except asyncio.CancelledError: + pass + finally: + receive_task.cancel() + process_task.cancel() + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + finally: + await solana_ws.unsubscribe() + if solana_ws.websocket: + await solana_ws.websocket.close() + await send_telegram_message("Reconnecting...") + await asyncio.sleep(5) + +# Example usage +# async def main(): +# account_address = "Vote111111111111111111111111111111111111111" + +async def get_last_transactions(account_address, check_interval=300, limit=1000): + last_check_time = None + last_signature = None + + while True: + current_time = datetime.now() + + if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: + params = [ + account_address, + { + "limit": limit + } + ] + + if last_signature: + params[1]["before"] = last_signature + + result = await solana_jsonrpc("getSignaturesForAddress", params) + + if result: + for signature in result: + if last_signature and signature['signature'] == last_signature: + break + + # Process the transaction + await process_transaction(signature) + + if result: + last_signature = result[0]['signature'] + + last_check_time = current_time + + await asyncio.sleep(1) # Sleep for 1 second before checking again + +async def process_transaction(signature): + # Implement your logic to process each transaction + print(f"Processing transaction: {signature['signature']}") + # You can add more processing logic here, such as storing in a database, + # triggering notifications, etc. + +if __name__ == "__main__": + asyncio.run(wallet_watch_loop()) \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py new file mode 100644 index 0000000..684e99d --- /dev/null +++ b/crypto/sol/modules/utils.py @@ -0,0 +1,27 @@ +# telegram_utils.py +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import aiohttp +import logging +from telegram import Bot +from telegram.constants import ParseMode +from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME + +# Initialize Telegram Bot +# Create a custom connection pool +conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit +timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout + + # bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) +bot = Bot(token=TELEGRAM_BOT_TOKEN) + +async def send_telegram_message(message): + try: + await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + logging.info(f"Telegram message sent: {message}") + except Exception as e: + logging.error(f"Error sending Telegram message: {str(e)}") + +# You can add more Telegram-related functions here if needed \ No newline at end of file From 921386e7fb0d11074464dbba7b0eee6b8717d70b Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Sun, 13 Oct 2024 23:42:40 +0300 Subject: [PATCH 09/13] compiling after refactoring --- crypto/sol/app.py | 171 ++------------------------------ crypto/sol/modules/SolanaAPI.py | 75 +++++++++----- crypto/sol/modules/utils.py | 42 +++++--- 3 files changed, 89 insertions(+), 199 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 737bc4b..6c68b93 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -55,9 +55,10 @@ from config import ( error_logger ) -from modules.utils import (send_telegram_message, get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) +from modules.utils import (get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) -from modules.SolanaAPI import SolanaAPI, solana_jsonrpc +from modules.SolanaAPI import SolanaAPI, solana_jsonrpc, wallet_watch_loop +from modules.utils import telegram_utils, send_telegram_message # # config = load_config() # load_dotenv() @@ -139,6 +140,11 @@ except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # +if not telegram_utils.bot: + try: + asyncio.run(telegram_utils.initialize()) + except Exception as e: + logging.error(f"Error initializing Telegram bot: {str(e)}") # async def send_telegram_message(message): # try: # await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) @@ -1210,167 +1216,6 @@ async def follow_move(move): # Helper functions -SOLANA_ENDPOINTS = [ - "wss://api.mainnet-beta.solana.com", - # "wss://solana-api.projectserum.com", - # "wss://rpc.ankr.com/solana", - # "wss://mainnet.rpcpool.com", -] -PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes - - -# async def heartbeat(websocket): -# while True: -# try: -# await websocket.ping() -# await asyncio.sleep(PING_INTERVAL) -# except websockets.exceptions.ConnectionClosed: -# break - -_first_subscription = True -_process_task = None -async def wallet_watch_loop(): - global _first_subscription, _process_task - reconnect_delay = 5 - max_reconnect_delay = 60 - - while True: - try: - try: - subscription_id = None - current_url = random.choice(SOLANA_ENDPOINTS) - async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: - logger.info(f"Connected to Solana websocket: {current_url}") - # heartbeat_task = asyncio.create_task(heartbeat(websocket)) - - while True: - if websocket.closed: - break - - subscription_id = await subscribe(websocket) - if subscription_id is not None: - await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") - if _first_subscription: - asyncio.create_task( list_initial_wallet_states()) - _first_subscription = False - _process_task = asyncio.create_task(process_messages(websocket, subscription_id)) - while True: - try:# drop subscription now - await process_messages(websocket, subscription_id) - # await asyncio.run(_process_task) - # await asyncio.wait_for(_process_task, timeout=SUBSCRIBE_INTERVAL) - except asyncio.TimeoutError: - # Timeout occurred, time to resubscribe - if not PROCESSING_LOG: - _process_task.cancel() - try: - await _process_task - except asyncio.CancelledError: - pass - await unsubscribe(websocket, subscription_id) - new_sub_id = await subscribe(websocket) - if new_sub_id is None: break - if new_sub_id > 1: # we sometimes get True instead of integer, so we cje - subscription_id = new_sub_id - logger.info(f"New subscription created with ID: {subscription_id}") - elif new_sub_id is True: - # Already subscribed - logger.info("Already subscribed, continuing with existing subscription") - if subscription_id: - process_task = asyncio.create_task(process_messages(websocket, subscription_id)) - - else: - # process_messages completed (shouldn't happen unless there's an error) - break - else: - send_telegram_message("Failed to connect. Retrying...") - - # heartbeat_task.cancel() - - except websockets.exceptions.WebSocketException as e: - logger.error(f"WebSocket error: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - - await unsubscribe(websocket, subscription_id) - await send_telegram_message("reconnecting...") - logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") - websocket.close() - except Exception as e: - logger.error(f"An unexpected error occurred - breaking watch loop: {e}") - - await asyncio.sleep(reconnect_delay) - reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay) - -async def subscribe(websocket): - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [FOLLOWED_WALLET]}, - {"commitment": "confirmed"} - ] - } - try: - await websocket.send(json.dumps(request)) - logger.info("Subscription request sent") - - response = await websocket.recv() - response_data = json.loads(response) - - if 'result' in response_data: - subscription_id = response_data['result'] - logger.info(f"Subscription successful. Subscription id: {subscription_id}") - return subscription_id - else: - logger.warning(f"Unexpected response: {response_data}") - return None - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") - await websocket.close() - return None - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - return None - -async def unsubscribe(websocket, subscription_id): - if subscription_id: - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsUnsubscribe", - "params": [subscription_id] - } - await websocket.send(json.dumps(request)) - logger.info(f"Unsubscribed from subscription id: {subscription_id}") - subscription_id = None - -async def process_messages(websocket, subscription_id): - try: - while True: - response = await websocket.recv() - response_data = json.loads(response) - logger.debug(f"Received response: {response_data}") - - if 'params' in response_data: - log = response_data['params']['result'] - logger.debug(f"Received transaction log: {log}") - asyncio.create_task(process_log(log)) - else: - logger.warning(f"Unexpected response: {response_data}") - - except websockets.exceptions.ConnectionClosedError as e: - logger.error(f"Connection closed unexpectedly: {e}") - await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") - pass - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - pk = os.getenv("PK") async def check_PK(): diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index ee4793b..661a86e 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -13,8 +13,14 @@ import datetime logger = logging.getLogger(__name__) -SOLANA_ENDPOINTS = ["your_endpoint_1", "your_endpoint_2"] # Add your endpoints here -SUBSCRIBE_INTERVAL = 300 # 5 minutes in seconds +SOLANA_ENDPOINTS = [ + "wss://api.mainnet-beta.solana.com", + # "wss://solana-api.projectserum.com", + # "wss://rpc.ankr.com/solana", + # "wss://mainnet.rpcpool.com", +] +PING_INTERVAL = 30 +SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL @@ -23,9 +29,9 @@ FOLLOWED_WALLET, SOLANA_HTTP_URL class SolanaAPI: def __init__(self): - self.websocket: Optional[websockets.WebSocketClientProtocol] = None - self.subscription_id: Optional[int] = None - self.message_queue: asyncio.Queue = asyncio.Queue() + self.websocket = None + self.subscription_id = None + self.message_queue = asyncio.Queue() async def connect(self): while True: @@ -38,37 +44,50 @@ class SolanaAPI: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) - async def subscribe(self): + async def ws_jsonrpc(self, method, params=None): + if not isinstance(params, list): + params = [params] if params is not None else [] + request = { "jsonrpc": "2.0", "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [FOLLOWED_WALLET]}, - {"commitment": "confirmed"} - ] + "method": method, + "params": params } + await self.websocket.send(json.dumps(request)) response = await self.websocket.recv() response_data = json.loads(response) - + if 'result' in response_data: - self.subscription_id = response_data['result'] - logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + return response_data['result'] + elif 'error' in response_data: + logger.error(f"Error in WebSocket RPC call: {response_data['error']}") + return None else: logger.warning(f"Unexpected response: {response_data}") + return None + + async def subscribe(self): + params = [ + {"mentions": [FOLLOWED_WALLET]}, + {"commitment": "confirmed"} + ] + result = await self.ws_jsonrpc("logsSubscribe", params) + if result is not None: + self.subscription_id = result + logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + else: + logger.error("Failed to subscribe") async def unsubscribe(self): if self.subscription_id: - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsUnsubscribe", - "params": [self.subscription_id] - } - await self.websocket.send(json.dumps(request)) - logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") - self.subscription_id = None + result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id]) + if result: + logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") + self.subscription_id = None + else: + logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") async def receive_messages(self): while True: @@ -83,6 +102,16 @@ class SolanaAPI: break async def process_messages(self): + while True: + message = await self.message_queue.get() + # Process the message here + # You can add your message processing logic + logger.info(f"Received message: {message}") + + async def close(self): + if self.websocket: + await self.websocket.close() + logger.info("WebSocket connection closed") while True: message = await self.message_queue.get() try: diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index 684e99d..b70b567 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -9,19 +9,35 @@ from telegram import Bot from telegram.constants import ParseMode from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME -# Initialize Telegram Bot -# Create a custom connection pool -conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit -timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout +class TelegramUtils: + def __init__(self): + self.bot = None + self.conn_pool = None + self.timeout = None - # bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) -bot = Bot(token=TELEGRAM_BOT_TOKEN) + async def initialize(self): + # Create a custom connection pool + self.conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit + self.timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout -async def send_telegram_message(message): - try: - await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) - logging.info(f"Telegram message sent: {message}") - except Exception as e: - logging.error(f"Error sending Telegram message: {str(e)}") + # Initialize Telegram Bot + self.bot = Bot(token=TELEGRAM_BOT_TOKEN) -# You can add more Telegram-related functions here if needed \ No newline at end of file + async def send_telegram_message(self, message): + if not self.bot: + await self.initialize() + + try: + await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + logging.info(f"Telegram message sent: {message}") + except Exception as e: + logging.error(f"Error sending Telegram message: {str(e)}") + + async def close(self): + if self.conn_pool: + await self.conn_pool.close() + +# Create a global instance of TelegramUtils +telegram_utils = TelegramUtils() + +# You can add more Telegram-related methods to the TelegramUtils class if needed \ No newline at end of file From 6b6018cd85ddb80b2dc805906ace83845d276e21 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 14 Oct 2024 11:58:10 +0300 Subject: [PATCH 10/13] launch --- .vscode/launch.json | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 8906574..b7e847f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -39,8 +39,8 @@ "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", "env": { - "NODE_ENV": "demo" - "OPENAI_API_KEY": + "NODE_ENV": "demo", + "OPENAI_API_KEY":"" }, "skipFiles": [ "/**" @@ -69,7 +69,13 @@ "program": "${file}" }, { - "name": "Python Debugger: Python File with Conda", + "name": "py: Sol app.py", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/crypto/sol/app.py", + }, + { + "name": "Python Debugger: Python File with Conda (py)", "type": "debugpy", "request": "launch", "program": "${file}", From 8d714a98013e72d96e89ea51659805206fd2f889 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 14 Oct 2024 17:34:18 +0300 Subject: [PATCH 11/13] solana API in module --- crypto/sol/modules/SolanaAPI.py | 168 ++++++++++++++++---------------- 1 file changed, 84 insertions(+), 84 deletions(-) diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 661a86e..bc19389 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -26,12 +26,14 @@ from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL ) +from modules.utils import telegram_utils -class SolanaAPI: - def __init__(self): +class SolanaWS: + def __init__(self, on_message: Optional[callable] = None): self.websocket = None self.subscription_id = None self.message_queue = asyncio.Queue() + self.on_message = on_message async def connect(self): while True: @@ -104,8 +106,7 @@ class SolanaAPI: async def process_messages(self): while True: message = await self.message_queue.get() - # Process the message here - # You can add your message processing logic + await self.on_message(message) logger.info(f"Received message: {message}") async def close(self): @@ -171,98 +172,97 @@ async def solana_jsonrpc(method, params = None, jsonParsed = True): logging.error(f"Error fetching data from Solana RPC: {e}") return None - -async def process_log(log): - # Implement your log processing logic here - pass - -async def send_telegram_message(message): - # Implement your Telegram message sending logic here - pass - -async def list_initial_wallet_states(): - # Implement your initial wallet state listing logic here - pass - -async def wallet_watch_loop(): - solana_ws = SolanaAPI() - first_subscription = True - - while True: - try: - await solana_ws.connect() - await solana_ws.subscribe() - - if first_subscription: - asyncio.create_task(list_initial_wallet_states()) - first_subscription = False - - await send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") - - receive_task = asyncio.create_task(solana_ws.receive_messages()) - process_task = asyncio.create_task(solana_ws.process_messages()) - - try: - await asyncio.gather(receive_task, process_task) - except asyncio.CancelledError: - pass - finally: - receive_task.cancel() - process_task.cancel() - - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - finally: - await solana_ws.unsubscribe() - if solana_ws.websocket: - await solana_ws.websocket.close() - await send_telegram_message("Reconnecting...") - await asyncio.sleep(5) - -# Example usage -# async def main(): -# account_address = "Vote111111111111111111111111111111111111111" +class SolanaAPI: -async def get_last_transactions(account_address, check_interval=300, limit=1000): - last_check_time = None - last_signature = None + def __init__(self, process_log_callback, send_telegram_message_callback, list_initial_wallet_states_callback): + self.process_log = process_log_callback + self.list_initial_wallet_states = list_initial_wallet_states_callback - while True: - current_time = datetime.now() + async def process_messages(self, solana_ws): + while True: + message = await solana_ws.message_queue.get() + await self.process_log(message) + + async def wallet_watch_loop(): + solana_ws = SolanaWS(on_message=process_log) + first_subscription = True - if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: - params = [ - account_address, - { - "limit": limit - } - ] + while True: + try: + await solana_ws.connect() + await solana_ws.subscribe() - if last_signature: - params[1]["before"] = last_signature + if first_subscription: + asyncio.create_task(self.list_initial_wallet_states()) + first_subscription = False - result = await solana_jsonrpc("getSignaturesForAddress", params) + await telegram_utils.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") - if result: - for signature in result: - if last_signature and signature['signature'] == last_signature: - break + receive_task = asyncio.create_task(solana_ws.receive_messages()) + process_task = asyncio.create_task(solana_ws.process_messages()) - # Process the transaction - await process_transaction(signature) + try: + await asyncio.gather(receive_task, process_task) + except asyncio.CancelledError: + pass + finally: + receive_task.cancel() + process_task.cancel() + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + finally: + await solana_ws.unsubscribe() + if solana_ws.websocket: + await solana_ws.websocket.close() + await telegram_utils.send_telegram_message("Reconnecting...") + await asyncio.sleep(5) + + async def process_transaction(signature): + # Implement your logic to process each transaction + print(f"Processing transaction: {signature['signature']}") + # You can add more processing logic here, such as storing in a database, + # triggering notifications, etc. + # Example usage + # async def main(): + # account_address = "Vote111111111111111111111111111111111111111" + + async def get_last_transactions(account_address, check_interval=300, limit=1000): + last_check_time = None + last_signature = None + + while True: + current_time = datetime.now() + + if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: + params = [ + account_address, + { + "limit": limit + } + ] + + if last_signature: + params[1]["before"] = last_signature + + result = await solana_jsonrpc("getSignaturesForAddress", params) if result: - last_signature = result[0]['signature'] + for signature in result: + if last_signature and signature['signature'] == last_signature: + break - last_check_time = current_time + # Process the transaction + await process_transaction(signature) + + if result: + last_signature = result[0]['signature'] + + last_check_time = current_time + + await asyncio.sleep(1) # Sleep for 1 second before checking again - await asyncio.sleep(1) # Sleep for 1 second before checking again -async def process_transaction(signature): - # Implement your logic to process each transaction - print(f"Processing transaction: {signature['signature']}") - # You can add more processing logic here, such as storing in a database, - # triggering notifications, etc. if __name__ == "__main__": asyncio.run(wallet_watch_loop()) \ No newline at end of file From d63d3d41bc4d34fafadf5beb214a74c06e74f389 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 15 Oct 2024 23:19:30 +0300 Subject: [PATCH 12/13] big refactoring --- crypto/sol/app.py | 609 +++-------------------------- crypto/sol/modules/SolanaAPI.py | 656 +++++++++++++++++++++++++++----- crypto/sol/modules/utils.py | 2 +- 3 files changed, 612 insertions(+), 655 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 6c68b93..56821b1 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -5,7 +5,6 @@ from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect -from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from spl.token.client import Token @@ -24,6 +23,7 @@ from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient +from solana.rpc.types import TokenAccountOpts, TxOpts import datetime import logging @@ -55,38 +55,9 @@ from config import ( error_logger ) -from modules.utils import (get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) -from modules.SolanaAPI import SolanaAPI, solana_jsonrpc, wallet_watch_loop -from modules.utils import telegram_utils, send_telegram_message - -# # config = load_config() -# load_dotenv() -# load_dotenv('.env.secret') -# # Configuration -# DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") -# FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") -# YOUR_WALLET = os.getenv("YOUR_WALLET") -# TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") -# SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") -# SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") -# DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') -# BOT_NAME = os.getenv("BOT_NAME") - -# logger = logging.getLogger(__name__) -# logging.basicConfig(level=logging.DEBUG) -# #logging.basicConfig(level=logging.INFO) - -# # Set up error logger -# log_dir = './logs' -# log_file = os.path.join(log_dir, 'error.log') -# os.makedirs(log_dir, exist_ok=True) -# error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) -# error_file_handler.setLevel(logging.ERROR) -# error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) -# error_logger = logging.getLogger('error_logger') -# error_logger.setLevel(logging.ERROR) -# error_logger.addHandler(error_file_handler) +from modules.SolanaAPI import SolanaAPI, SolanaDEX +from modules.utils import telegram_utils # Function to find the latest log file @@ -145,7 +116,7 @@ if not telegram_utils.bot: asyncio.run(telegram_utils.initialize()) except Exception as e: logging.error(f"Error initializing Telegram bot: {str(e)}") -# async def send_telegram_message(message): +# async def telegram_utils.send_telegram_message(message): # try: # await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) # logging.info(f"Telegram message sent: {message}") @@ -161,196 +132,6 @@ if not telegram_utils.bot: # # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # -async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: - global TOKENS_INFO - - # Skip for USD - prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} - remaining_tokens = [addr for addr in token_addresses if addr not in prices] - - # Try CoinGecko - coingecko_prices = await get_prices_from_coingecko(remaining_tokens) - prices.update(coingecko_prices) - - - # For remaining missing tokens, try Jupiter - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) - prices.update(jupiter_prices) - - - # For tokens not found in CoinGecko, use DexScreener - missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) - if missing_tokens: - dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) - prices.update(dexscreener_prices) - - # For remaining missing tokens, try Raydium - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - raydium_prices = await get_prices_from_raydium(list(missing_tokens)) - prices.update(raydium_prices) - - # For remaining missing tokens, try Orca - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - orca_prices = await get_prices_from_orca(list(missing_tokens)) - prices.update(orca_prices) - - # If any tokens are still missing, set their prices to 0 - for token in set(token_addresses) - set(prices.keys()): - prices[token] = 0.0 - logging.warning(f"Price not found for token {token}. Setting to 0.") - - for token, price in prices.items(): - token_info = TOKENS_INFO.setdefault(token, {}) - if 'symbol' not in token_info: - token_info['symbol'] = await get_token_metadata_symbol(token) - token_info['price'] = price - - return prices - - -async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: - base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" - prices = {} - - async def fetch_single_price(session, address): - params = { - "contract_addresses": address, - "vs_currencies": DISPLAY_CURRENCY.lower() - } - try: - async with session.get(base_url, params=params) as response: - if response.status == 200: - data = await response.json() - if address in data and DISPLAY_CURRENCY.lower() in data[address]: - return address, data[address][DISPLAY_CURRENCY.lower()] - else: - logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") - return address, None - - async with aiohttp.ClientSession() as session: - tasks = [fetch_single_price(session, address) for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, price in results: - if price is not None: - prices[address] = price - - return prices - -async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: - base_url = "https://api.dexscreener.com/latest/dex/tokens/" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, result in zip(token_addresses, results): - if result and 'pairs' in result and result['pairs']: - pair = result['pairs'][0] # Use the first pair (usually the most liquid) - prices[address] = float(pair['priceUsd']) - else: - logging.warning(f"No price data found on DexScreener for token {address}") - except Exception as e: - logging.error(f"Error fetching token prices from DexScreener: {str(e)}") - - return prices - -async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: - url = "https://price.jup.ag/v4/price" - params = { - "ids": ",".join(token_addresses) - } - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, params=params) as response: - if response.status == 200: - data = await response.json() - for address, price_info in data.get('data', {}).items(): - if 'price' in price_info: - prices[address] = float(price_info['price']) - else: - logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Jupiter: {str(e)}") - return prices - -# New function for Raydium -async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.raydium.io/v2/main/price" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - for address in token_addresses: - if address in data: - prices[address] = float(data[address]) - else: - logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Raydium: {str(e)}") - return prices - -# New function for Orca -async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.orca.so/allTokens" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - for token_info in data: - if token_info['mint'] in token_addresses: - prices[token_info['mint']] = float(token_info['price']) - else: - logging.error(f"Failed to get token prices from Orca. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Orca: {str(e)}") - return prices - - -async def fetch_token_data(session, url): - try: - async with session.get(url) as response: - if response.status == 200: - return await response.json() - else: - logging.error(f"Failed to fetch data from {url}. Status: {response.status}") - return None - except Exception as e: - logging.error(f"Error fetching data from {url}: {str(e)}") - return None - -async def get_sol_price() -> float: - url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - return data['solana'][DISPLAY_CURRENCY.lower()] - else: - logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}") - return await get_sol_price_from_dexscreener() - -async def get_sol_price_from_dexscreener() -> float: - sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address - prices = await get_prices_from_dexscreener([sol_address]) - return prices.get(sol_address, 0.0) - # # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # @@ -415,43 +196,6 @@ from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from borsh_construct import String, CStruct -async def get_token_metadata_symbol(mint_address): - global TOKENS_INFO - - if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: - return TOKENS_INFO[mint_address].get('symbol') - - try: - account_data_result = await solana_jsonrpc("getAccountInfo", mint_address) - if 'value' in account_data_result and 'data' in account_data_result['value']: - account_data_data = account_data_result['value']['data'] - if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: - account_data_info = account_data_data['parsed']['info'] - if 'decimals' in account_data_info: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] - else: - TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} - if 'tokenName' in account_data_info: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] - else: - TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} - - metadata = await get_token_metadata(mint_address) - if metadata: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address].update(metadata) - else: - TOKENS_INFO[mint_address] = metadata - await save_token_info() - # TOKENS_INFO[mint_address] = metadata - # return metadata.get('symbol') or metadata.get('name') - return TOKENS_INFO[mint_address].get('symbol') - except Exception as e: - logging.error(f"Error fetching token name for {mint_address}: {str(e)}") - return None - @@ -557,80 +301,6 @@ async def get_token_metadata(mint_address): return None -async def get_wallet_balances(wallet_address, doGetTokenName=True): - balances = {} - logging.info(f"Getting balances for wallet: {wallet_address}") - global TOKENS_INFO - try: - response = await solana_client.get_token_accounts_by_owner_json_parsed( - Pubkey.from_string(wallet_address), - opts=TokenAccountOpts( - program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") - ), - commitment=Confirmed - ) - - if response.value: - for account in response.value: - try: - parsed_data = account.account.data.parsed - if isinstance(parsed_data, dict) and 'info' in parsed_data: - info = parsed_data['info'] - if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: - mint = info['mint'] - decimals = info['tokenAmount']['decimals'] - amount = float(info['tokenAmount']['amount'])/10**decimals - if amount > 0: - if mint in TOKENS_INFO: - token_name = TOKENS_INFO[mint].get('symbol') - elif doGetTokenName: - token_name = await get_token_metadata_symbol(mint) or 'N/A' - # sleep for 1 second to avoid rate limiting - await asyncio.sleep(2) - - TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) - TOKENS_INFO[mint]['decimals'] = decimals - balances[mint] = { - 'name': token_name or 'N/A', - 'address': mint, - 'amount': amount, - 'decimals': decimals - } - # sleep for 1 second to avoid rate limiting - logging.debug(f"Account balance for {token_name} ({mint}): {amount}") - else: - logging.warning(f"Unexpected data format for account: {account}") - except Exception as e: - logging.error(f"Error parsing account data: {str(e)}") - - sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) - if sol_balance.value is not None: - balances['SOL'] = { - 'name': 'SOL', - 'address': 'SOL', - 'amount': sol_balance.value / 1e9 - } - else: - logging.warning(f"SOL balance response missing for wallet: {wallet_address}") - - except Exception as e: - logging.error(f"Error getting wallet balances: {str(e)}") - logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") - return balances - -async def convert_balances_to_currency(balances , sol_price): - converted_balances = {} - for address, info in balances.items(): - converted_balance = info.copy() # Create a copy of the original info - if info['name'] == 'SOL': - converted_balance['value'] = info['amount'] * sol_price - elif address in TOKEN_PRICES: - converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] - else: - converted_balance['value'] = None # Price not available - logging.warning(f"Price not available for token {info['name']} ({address})") - converted_balances[address] = converted_balance - return converted_balances async def get_swap_transaction_details(tx_signature_str): @@ -675,201 +345,10 @@ async def get_swap_transaction_details(tx_signature_str): return None - -# # # RAW Solana API RPC # # # - -#this is the meat of the application -async def get_transaction_details_rpc(tx_signature, readfromDump=False): - global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO - try: - if readfromDump and os.path.exists('./logs/transation_details.json'): - with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details - transaction_details = json.load(f) - return transaction_details - else: - transaction_details = await solana_jsonrpc("getTransaction", tx_signature) - with open('./logs/transation_details.json', 'w') as f: - json.dump(transaction_details, f, indent=2) - - if transaction_details is None: - logging.error(f"Error fetching transaction details for {tx_signature}") - return None - - # Initialize default result structure - parsed_result = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } - - # Extract order_id from logs - log_messages = transaction_details.get("meta", {}).get("logMessages", []) - for log in log_messages: - if "order_id" in log: - parsed_result["order_id"] = log.split(":")[2].strip() - break - - # Extract token transfers from innerInstructions - inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) - for instruction_set in inner_instructions: - for instruction in instruction_set.get('instructions', []): - if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': - info = instruction['parsed']['info'] - mint = info['mint'] - amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals - - # Determine which token is being swapped in and out based on zero balances - if parsed_result["token_in"] is None and amount > 0: - parsed_result["token_in"] = mint - parsed_result["amount_in"] = amount - - - if parsed_result["token_in"] is None or parsed_result["token_out"] is None: - # if we've failed to extract token_in and token_out from the transaction details, try a second method - inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) - transfers = [] - - for instruction_set in inner_instructions: - for instruction in instruction_set.get('instructions', []): - if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: - info = instruction['parsed']['info'] - amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) - decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 - adjusted_amount = amount / (10 ** decimals) - # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) - transfers.append({ - 'mint': info.get('mint'), - 'amount': adjusted_amount, - 'source': info['source'], - 'destination': info['destination'] - }) - - # Identify token_in and token_out - if len(transfers) >= 2: - parsed_result["token_in"] = transfers[0]['mint'] - parsed_result["amount_in"] = transfers[0]['amount'] - parsed_result["token_out"] = transfers[-1]['mint'] - parsed_result["amount_out"] = transfers[-1]['amount'] - - # If mint is not provided, query the Solana network for the account data - if parsed_result["token_in"] is None or parsed_result["token_out"] is None: - #for transfer in transfers: - # do only first and last transfer - for transfer in [transfers[0], transfers[-1]]: - if transfer['mint'] is None: - # Query the Solana network for the account data - account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) - - if 'value' in account_data_result and 'data' in account_data_result['value']: - account_data_value = account_data_result['value'] - account_data_data = account_data_value['data'] - if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: - account_data_info = account_data_data['parsed']['info'] - if 'mint' in account_data_info: - transfer['mint'] = account_data_info['mint'] - if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: - await get_token_metadata_symbol(transfer['mint']) - # get actual prices - current_price = await get_token_prices([transfer['mint']]) - - if parsed_result["token_in"] is None: - parsed_result["token_in"] = transfer['mint'] - parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol'] - parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] - parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) - elif parsed_result["token_out"] is None: - parsed_result["token_out"] = transfer['mint'] - parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol'] - parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] - parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price'] - - pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) - for balance in pre_balalnces: - if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: - parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] - break - - - # Calculate percentage swapped - try: - if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: - parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 - else: - # calculate based on total wallet value: FOLLOWED_WALLET_VALUE - parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100 - except Exception as e: - logging.error(f"Error calculating percentage swapped: {e}") - - return parsed_result - - except requests.exceptions.RequestException as e: - print("Error fetching transaction details:", e) - - # # # # # # # # # # Functionality # # # # # # # # # # -async def list_initial_wallet_states(): - global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES - global TOKENS_INFO # new - - followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) - your_wallet_balances = await get_wallet_balances(YOUR_WALLET) - - all_token_addresses = list(set(followed_wallet_balances.keys()) | - set(your_wallet_balances.keys()) | - set(TOKEN_ADDRESSES.values())) - - TOKEN_PRICES = await get_token_prices(all_token_addresses) - sol_price = await get_sol_price() - - followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) - your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) - - - TOKEN_ADDRESSES = { - address: info for address, - info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 - } - logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") - - followed_wallet_state = [] - FOLLOWED_WALLET_VALUE = 0 - for address, info in followed_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") - FOLLOWED_WALLET_VALUE += info['value'] - - your_wallet_state = [] - YOUR_WALLET_VALUE = 0 - for address, info in your_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") - YOUR_WALLET_VALUE += info['value'] - - message = ( - f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" - f"Followed Wallet ({FOLLOWED_WALLET}):\n" - f"{chr(10).join(followed_wallet_state)}\n" - f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Your Wallet ({YOUR_WALLET}):\n" - f"{chr(10).join(your_wallet_state)}\n" - f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Monitored Tokens:\n" - f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" - ) - - logging.info(message) - await send_telegram_message(message) - - # save token info to file - await save_token_info() def safe_get_property(info, property_name, default='Unknown'): if not isinstance(info, dict): @@ -887,7 +366,7 @@ async def get_transaction_details_with_retry(transaction_id, retry_delay = 8, ma # qwery every 5 seconds for the transaction details untill not None or 30 seconds for _ in range(max_retries): try: - tx_details = await get_transaction_details_rpc(transaction_id) + tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: @@ -1015,13 +494,13 @@ async def process_log(log_result): f"Amount In USD: {tr_details['amount_in_USD']:.2f}\n" f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%" ) - await send_telegram_message(message_text) + await telegram_utils.send_telegram_message(message_text) await follow_move(tr_details) await save_token_info() except Exception as e: logging.error(f"Error aquiring log details and following: {e}") - await send_telegram_message(f"Not followed! Error following move.") + await telegram_utils.send_telegram_message(f"Not followed! Error following move.") @@ -1072,14 +551,14 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: async def follow_move(move): - your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) + your_balances = await solanaAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) if your_balance_info is not None: # Use the balance print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") else: print("No ballance found for {move['symbol_in']}. Skipping move.") - await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") + await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") return your_balance = your_balance_info['amount'] @@ -1087,12 +566,12 @@ async def follow_move(move): token_info = TOKENS_INFO.get(move['token_in']) token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in']) - token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await get_token_metadata_symbol(move['token_out']) + token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out']) if not your_balance: msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." logging.warning(msg) - await send_telegram_message(msg) + await telegram_utils.send_telegram_message(msg) return # move["percentage_swapped"] = (move["amount_out"] / move["amount_in"]) * 100 @@ -1119,7 +598,7 @@ async def follow_move(move): f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." ) logging.warning(msg) - await send_telegram_message(msg) + await telegram_utils.send_telegram_message(msg) try: try: @@ -1129,7 +608,7 @@ async def follow_move(move): ) # logging.info(notification) # error_logger.info(notification) - # await send_telegram_message(notification) + # await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") @@ -1159,7 +638,7 @@ async def follow_move(move): # append to notification notification += f"\n\nTransaction: {transaction_id}" - await send_telegram_message(f"Follow Transaction Sent: {transaction_id}") + await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}") tx_details = await get_transaction_details_with_retry(transaction_id) if tx_details is not None: @@ -1173,7 +652,7 @@ async def follow_move(move): # log the errors to /logs/errors.log error_logger.error(error_message) error_logger.exception(e) - await send_telegram_message(error_message) + await telegram_utils.send_telegram_message(error_message) amount = amount * 0.75 await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) @@ -1198,7 +677,7 @@ async def follow_move(move): f"\n\nTransaction: {transaction_id}" ) logging.info(notification) - await send_telegram_message(notification) + await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") @@ -1210,9 +689,9 @@ async def follow_move(move): error_logger.exception(e) \ # if error_message contains 'Program log: Error: insufficient funds' if 'insufficient funds' in error_message: - await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") + await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") else: - await send_telegram_message(error_message) + await telegram_utils.send_telegram_message(error_message) # Helper functions @@ -1236,41 +715,45 @@ async def check_PK(): if not pk: logging.error("Private key not found in environment variables. Will not be able to sign transactions.") # send TG warning message - await send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") + await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") +solanaAPI = SolanaAPI(process_transaction_callback=process_log) + async def main(): - global bot, PROCESSING_LOG + global solanaAPI, bot, PROCESSING_LOG - await send_telegram_message("Solana Agent Started. Connecting to mainnet...") + await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() # new: restart wallet_watch_loop every hour - while True: - wallet_watch_task = asyncio.create_task(wallet_watch_loop()) + await solanaAPI.wallet_watch_loop() + + # while True: + # wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop()) - try: - # Wait for an hour or until the task completes, whichever comes first - await asyncio.wait_for(wallet_watch_task, timeout=3600) - except asyncio.TimeoutError: - # If an hour has passed, cancel the task if not PROCESSING - if PROCESSING_LOG: - logging.info("wallet_watch_loop is processing logs. Will not restart.") - await send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") - else: - wallet_watch_task.cancel() - try: - await wallet_watch_task - except asyncio.CancelledError: - logging.info("wallet_watch_loop was cancelled after running for an hour") - except Exception as e: - logging.error(f"Error in wallet_watch_loop: {str(e)}") - await send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") + # try: + # # Wait for an hour or until the task completes, whichever comes first + # await asyncio.wait_for(wallet_watch_task, timeout=3600) + # except asyncio.TimeoutError: + # # If an hour has passed, cancel the task if not PROCESSING + # if PROCESSING_LOG: + # logging.info("wallet_watch_loop is processing logs. Will not restart.") + # await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") + # else: + # wallet_watch_task.cancel() + # try: + # await wallet_watch_task + # except asyncio.CancelledError: + # logging.info("wallet_watch_loop was cancelled after running for an hour") + # except Exception as e: + # logging.error(f"Error in wallet_watch_loop: {str(e)}") + # await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") - logging.info("Restarting wallet_watch_loop") - await send_telegram_message("Restarting wallet_watch_loop") + # logging.info("Restarting wallet_watch_loop") + # await telegram_utils.send_telegram_message("Restarting wallet_watch_loop") diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index bc19389..e0fa5cb 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -1,5 +1,7 @@ import sys import os + +import aiohttp sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import asyncio @@ -7,23 +9,21 @@ import json import logging import random import websockets -from typing import Optional +from typing import Dict, List, Optional import requests -import datetime +from datetime import datetime +from solana.rpc.types import TokenAccountOpts, TxOpts logger = logging.getLogger(__name__) SOLANA_ENDPOINTS = [ "wss://api.mainnet-beta.solana.com", - # "wss://solana-api.projectserum.com", - # "wss://rpc.ankr.com/solana", - # "wss://mainnet.rpcpool.com", ] PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes +SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 1 minute from config import ( -FOLLOWED_WALLET, SOLANA_HTTP_URL + FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY ) from modules.utils import telegram_utils @@ -106,85 +106,60 @@ class SolanaWS: async def process_messages(self): while True: message = await self.message_queue.get() - await self.on_message(message) + if self.on_message: + await self.on_message(message) logger.info(f"Received message: {message}") async def close(self): if self.websocket: await self.websocket.close() logger.info("WebSocket connection closed") - while True: - message = await self.message_queue.get() - try: - response_data = json.loads(message) - if 'params' in response_data: - log = response_data['params']['result'] - await process_log(log) - else: - logger.warning(f"Unexpected response: {response_data}") - except json.JSONDecodeError as e: - logger.error(f"Failed to decode JSON: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred while processing message: {e}") - finally: - self.message_queue.task_done() + async def solana_jsonrpc(method, params=None, jsonParsed=True): + if not isinstance(params, list): + params = [params] if params is not None else [] -async def solana_jsonrpc(method, params = None, jsonParsed = True): - # target json example: - # data = { - # "jsonrpc": "2.0", - # "id": 1, - # "method": "getTransaction", - # "params": [ - # tx_signature, - # { - # "encoding": "jsonParsed", - # "maxSupportedTransactionVersion": 0 - # } - # ] - # } - # if param is not array, make it array - if not isinstance(params, list): - params = [params] + data = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params + } + + if jsonParsed: + data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}) + else: + data["params"].append({"maxSupportedTransactionVersion": 0}) - data = { - "jsonrpc": "2.0", - "id": 1, - "method": method, - "params": params or [] - } - data["params"].append({"maxSupportedTransactionVersion": 0}) - if jsonParsed: - data["params"][1]["encoding"] = "jsonParsed" - - - try: - # url = 'https://solana.drpc.org' - response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - result = response.json() - if not 'result' in result or 'error' in result: - print("Error fetching data from Solana RPC:", result) + try: + response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) + response.raise_for_status() + result = response.json() + if 'result' not in result or 'error' in result: + logger.error("Error fetching data from Solana RPC:", result) + return None + return result['result'] + except Exception as e: + logger.error(f"Error fetching data from Solana RPC: {e}") return None - return result['result'] - except Exception as e: - logging.error(f"Error fetching data from Solana RPC: {e}") - return None - -class SolanaAPI: - - def __init__(self, process_log_callback, send_telegram_message_callback, list_initial_wallet_states_callback): - self.process_log = process_log_callback - self.list_initial_wallet_states = list_initial_wallet_states_callback + +class SolanaAPI: + def __init__(self, process_transaction_callback, on_initial_subscription_callback = None, on_bot_message=None): + self.process_transaction = process_transaction_callback + self.on_initial_subscription = on_initial_subscription_callback + self.on_bot_message = on_bot_message, + + self.dex = SolanaDEX(DISPLAY_CURRENCY) + self.solana_ws = SolanaWS(on_message=self.process_transaction) async def process_messages(self, solana_ws): while True: message = await solana_ws.message_queue.get() - await self.process_log(message) - - async def wallet_watch_loop(): - solana_ws = SolanaWS(on_message=process_log) + await self.process_transaction(message) + + async def wallet_watch_loop(self): + + solana_ws = SolanaWS(on_message=self.process_transaction) first_subscription = True while True: @@ -193,10 +168,10 @@ class SolanaAPI: await solana_ws.subscribe() if first_subscription: - asyncio.create_task(self.list_initial_wallet_states()) + asyncio.create_task(self.on_initial_subscription()) first_subscription = False - await telegram_utils.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + await self.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) @@ -214,20 +189,15 @@ class SolanaAPI: finally: await solana_ws.unsubscribe() if solana_ws.websocket: - await solana_ws.websocket.close() - await telegram_utils.send_telegram_message("Reconnecting...") + await solana_ws.close() + await self.send_telegram_message("Reconnecting...") await asyncio.sleep(5) - - async def process_transaction(signature): - # Implement your logic to process each transaction + + async def process_transaction(self, signature): print(f"Processing transaction: {signature['signature']}") - # You can add more processing logic here, such as storing in a database, - # triggering notifications, etc. - # Example usage - # async def main(): - # account_address = "Vote111111111111111111111111111111111111111" - - async def get_last_transactions(account_address, check_interval=300, limit=1000): + # Add your transaction processing logic here + + async def get_last_transactions(self, account_address, check_interval=300, limit=1000): last_check_time = None last_signature = None @@ -252,17 +222,521 @@ class SolanaAPI: if last_signature and signature['signature'] == last_signature: break - # Process the transaction - await process_transaction(signature) + await self.process_transaction(signature) if result: last_signature = result[0]['signature'] last_check_time = current_time - await asyncio.sleep(1) # Sleep for 1 second before checking again + await asyncio.sleep(1) + + async def get_token_metadata_symbol(mint_address): + global TOKENS_INFO + + if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: + return TOKENS_INFO[mint_address].get('symbol') + + try: + account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address) + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_data = account_data_result['value']['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'decimals' in account_data_info: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] + else: + TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} + if 'tokenName' in account_data_info: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] + else: + TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} + + metadata = await get_token_metadata(mint_address) + if metadata: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address].update(metadata) + else: + TOKENS_INFO[mint_address] = metadata + await save_token_info() + # TOKENS_INFO[mint_address] = metadata + # return metadata.get('symbol') or metadata.get('name') + return TOKENS_INFO[mint_address].get('symbol') + except Exception as e: + logging.error(f"Error fetching token name for {mint_address}: {str(e)}") + return None + + async def get_transaction_details_rpc(tx_signature, readfromDump=False): + global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO + try: + if readfromDump and os.path.exists('./logs/transation_details.json'): + with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details + transaction_details = json.load(f) + return transaction_details + else: + transaction_details = await solana_jsonrpc("getTransaction", tx_signature) + with open('./logs/transation_details.json', 'w') as f: + json.dump(transaction_details, f, indent=2) + + if transaction_details is None: + logging.error(f"Error fetching transaction details for {tx_signature}") + return None + + # Initialize default result structure + parsed_result = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + + # Extract order_id from logs + log_messages = transaction_details.get("meta", {}).get("logMessages", []) + for log in log_messages: + if "order_id" in log: + parsed_result["order_id"] = log.split(":")[2].strip() + break + + # Extract token transfers from innerInstructions + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': + info = instruction['parsed']['info'] + mint = info['mint'] + amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals + + # Determine which token is being swapped in and out based on zero balances + if parsed_result["token_in"] is None and amount > 0: + parsed_result["token_in"] = mint + parsed_result["amount_in"] = amount + + + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + # if we've failed to extract token_in and token_out from the transaction details, try a second method + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + transfers = [] + + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: + info = instruction['parsed']['info'] + amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) + decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 + adjusted_amount = amount / (10 ** decimals) + # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) + transfers.append({ + 'mint': info.get('mint'), + 'amount': adjusted_amount, + 'source': info['source'], + 'destination': info['destination'] + }) + + # Identify token_in and token_out + if len(transfers) >= 2: + parsed_result["token_in"] = transfers[0]['mint'] + parsed_result["amount_in"] = transfers[0]['amount'] + parsed_result["token_out"] = transfers[-1]['mint'] + parsed_result["amount_out"] = transfers[-1]['amount'] + + # If mint is not provided, query the Solana network for the account data + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + #for transfer in transfers: + # do only first and last transfer + for transfer in [transfers[0], transfers[-1]]: + if transfer['mint'] is None: + # Query the Solana network for the account data + account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) + + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_value = account_data_result['value'] + account_data_data = account_data_value['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'mint' in account_data_info: + transfer['mint'] = account_data_info['mint'] + if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: + await get_token_metadata_symbol(transfer['mint']) + # get actual prices + current_price = await get_token_prices([transfer['mint']]) + + if parsed_result["token_in"] is None: + parsed_result["token_in"] = transfer['mint'] + parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol'] + parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] + parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) + elif parsed_result["token_out"] is None: + parsed_result["token_out"] = transfer['mint'] + parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol'] + parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] + parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price'] + + pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) + for balance in pre_balalnces: + if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: + parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] + break + + + # Calculate percentage swapped + try: + if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: + parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 + else: + # calculate based on total wallet value: FOLLOWED_WALLET_VALUE + parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100 + except Exception as e: + logging.error(f"Error calculating percentage swapped: {e}") + + return parsed_result + + except requests.exceptions.RequestException as e: + print("Error fetching transaction details:", e) + -if __name__ == "__main__": - asyncio.run(wallet_watch_loop()) \ No newline at end of file + +class SolanaDEX: + def __init__(self, DISPLAY_CURRENCY): + self.DISPLAY_CURRENCY = DISPLAY_CURRENCY + pass + + async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: + global TOKENS_INFO + + # Skip for USD + prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} + remaining_tokens = [addr for addr in token_addresses if addr not in prices] + + # Try CoinGecko + coingecko_prices = await get_prices_from_coingecko(remaining_tokens) + prices.update(coingecko_prices) + + + # For remaining missing tokens, try Jupiter + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) + prices.update(jupiter_prices) + + + # For tokens not found in CoinGecko, use DexScreener + missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) + if missing_tokens: + dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) + prices.update(dexscreener_prices) + + # For remaining missing tokens, try Raydium + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + raydium_prices = await get_prices_from_raydium(list(missing_tokens)) + prices.update(raydium_prices) + + # For remaining missing tokens, try Orca + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + orca_prices = await get_prices_from_orca(list(missing_tokens)) + prices.update(orca_prices) + + # If any tokens are still missing, set their prices to 0 + for token in set(token_addresses) - set(prices.keys()): + prices[token] = 0.0 + logging.warning(f"Price not found for token {token}. Setting to 0.") + + for token, price in prices.items(): + token_info = TOKENS_INFO.setdefault(token, {}) + if 'symbol' not in token_info: + token_info['symbol'] = await get_token_metadata_symbol(token) + token_info['price'] = price + + return prices + + async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: + base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" + prices = {} + + async def fetch_single_price(session, address): + params = { + "contract_addresses": address, + "vs_currencies": DISPLAY_CURRENCY.lower() + } + try: + async with session.get(base_url, params=params) as response: + if response.status == 200: + data = await response.json() + if address in data and DISPLAY_CURRENCY.lower() in data[address]: + return address, data[address][DISPLAY_CURRENCY.lower()] + else: + logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") + return address, None + + async with aiohttp.ClientSession() as session: + tasks = [fetch_single_price(session, address) for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, price in results: + if price is not None: + prices[address] = price + + return prices + + async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: + base_url = "https://api.dexscreener.com/latest/dex/tokens/" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, result in zip(token_addresses, results): + if result and 'pairs' in result and result['pairs']: + pair = result['pairs'][0] # Use the first pair (usually the most liquid) + prices[address] = float(pair['priceUsd']) + else: + logging.warning(f"No price data found on DexScreener for token {address}") + except Exception as e: + logging.error(f"Error fetching token prices from DexScreener: {str(e)}") + + return prices + + async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: + url = "https://price.jup.ag/v4/price" + params = { + "ids": ",".join(token_addresses) + } + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + for address, price_info in data.get('data', {}).items(): + if 'price' in price_info: + prices[address] = float(price_info['price']) + else: + logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Jupiter: {str(e)}") + return prices + + # New function for Raydium + async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.raydium.io/v2/main/price" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for address in token_addresses: + if address in data: + prices[address] = float(data[address]) + else: + logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Raydium: {str(e)}") + return prices + + # New function for Orca + async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.orca.so/allTokens" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for token_info in data: + if token_info['mint'] in token_addresses: + prices[token_info['mint']] = float(token_info['price']) + else: + logging.error(f"Failed to get token prices from Orca. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Orca: {str(e)}") + return prices + + async def fetch_token_data(session, url): + try: + async with session.get(url) as response: + if response.status == 200: + return await response.json() + else: + logging.error(f"Failed to fetch data from {url}. Status: {response.status}") + return None + except Exception as e: + logging.error(f"Error fetching data from {url}: {str(e)}") + return None + + async def get_sol_price() -> float: + sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address + return await get_token_prices([sol_address]).get(sol_address, 0.0) + + async def get_wallet_balances(wallet_address, doGetTokenName=True): + balances = {} + logging.info(f"Getting balances for wallet: {wallet_address}") + global TOKENS_INFO + try: + response = await solana_client.get_token_accounts_by_owner_json_parsed( + Pubkey.from_string(wallet_address), + opts=TokenAccountOpts( + program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") + ), + commitment=Confirmed + ) + + if response.value: + for account in response.value: + try: + parsed_data = account.account.data.parsed + if isinstance(parsed_data, dict) and 'info' in parsed_data: + info = parsed_data['info'] + if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: + mint = info['mint'] + decimals = info['tokenAmount']['decimals'] + amount = float(info['tokenAmount']['amount'])/10**decimals + if amount > 0: + if mint in TOKENS_INFO: + token_name = TOKENS_INFO[mint].get('symbol') + elif doGetTokenName: + token_name = await get_token_metadata_symbol(mint) or 'N/A' + # sleep for 1 second to avoid rate limiting + await asyncio.sleep(2) + + TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) + TOKENS_INFO[mint]['decimals'] = decimals + balances[mint] = { + 'name': token_name or 'N/A', + 'address': mint, + 'amount': amount, + 'decimals': decimals + } + # sleep for 1 second to avoid rate limiting + logging.debug(f"Account balance for {token_name} ({mint}): {amount}") + else: + logging.warning(f"Unexpected data format for account: {account}") + except Exception as e: + logging.error(f"Error parsing account data: {str(e)}") + + sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) + if sol_balance.value is not None: + balances['SOL'] = { + 'name': 'SOL', + 'address': 'SOL', + 'amount': sol_balance.value / 1e9 + } + else: + logging.warning(f"SOL balance response missing for wallet: {wallet_address}") + + except Exception as e: + logging.error(f"Error getting wallet balances: {str(e)}") + logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") + return balances + + async def convert_balances_to_currency(balances , sol_price): + converted_balances = {} + for address, info in balances.items(): + converted_balance = info.copy() # Create a copy of the original info + if info['name'] == 'SOL': + converted_balance['value'] = info['amount'] * sol_price + elif address in TOKEN_PRICES: + converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] + else: + converted_balance['value'] = None # Price not available + logging.warning(f"Price not available for token {info['name']} ({address})") + converted_balances[address] = converted_balance + return converted_balances + + + async def list_initial_wallet_states(): + global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES + global TOKENS_INFO # new + + followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) + your_wallet_balances = await get_wallet_balances(YOUR_WALLET) + + all_token_addresses = list(set(followed_wallet_balances.keys()) | + set(your_wallet_balances.keys()) | + set(TOKEN_ADDRESSES.values())) + + TOKEN_PRICES = await get_token_prices(all_token_addresses) + sol_price = await get_sol_price() + + followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) + your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) + + + TOKEN_ADDRESSES = { + address: info for address, + info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 + } + logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") + + followed_wallet_state = [] + FOLLOWED_WALLET_VALUE = 0 + for address, info in followed_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") + FOLLOWED_WALLET_VALUE += info['value'] + + your_wallet_state = [] + YOUR_WALLET_VALUE = 0 + for address, info in your_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") + YOUR_WALLET_VALUE += info['value'] + + message = ( + f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" + f"Followed Wallet ({FOLLOWED_WALLET}):\n" + f"{chr(10).join(followed_wallet_state)}\n" + f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Your Wallet ({YOUR_WALLET}):\n" + f"{chr(10).join(your_wallet_state)}\n" + f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Monitored Tokens:\n" + f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" + ) + + logging.info(message) + await telegram_utils.send_telegram_message(message) + + # save token info to file + await save_token_info() + + + +#example +# async def main(): +# await telegram_utils.initialize() + +# async def process_log(log): +# print(f"Processing log: {log}") + +# async def list_initial_wallet_states(): +# print("Listing initial wallet states") + + +# wallet_watch_task = asyncio.create_task(solana_api.wallet_watch_loop()) + +# try: +# await asyncio.gather(wallet_watch_task) +# except asyncio.CancelledError: +# pass +# finally: +# await telegram_utils.close() + +# if __name__ == "__main__": +# asyncio.run(main()) \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index b70b567..6ed7ab7 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -28,7 +28,7 @@ class TelegramUtils: await self.initialize() try: - await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + # await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") From 296c4dcf8163c64d283a96dc3c1b13a00b96039d Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Wed, 16 Oct 2024 11:50:59 +0300 Subject: [PATCH 13/13] wip --- crypto/sol/modules/SolanaAPI.py | 35 +++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index e0fa5cb..7f5cda2 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -34,6 +34,7 @@ class SolanaWS: self.subscription_id = None self.message_queue = asyncio.Queue() self.on_message = on_message + self.websocket = None async def connect(self): while True: @@ -46,7 +47,7 @@ class SolanaWS: logger.error(f"Failed to connect to {current_url}: {e}") await asyncio.sleep(5) - async def ws_jsonrpc(self, method, params=None): + async def ws_jsonrpc(self, ws, method, params=None, doProcessResponse = True): if not isinstance(params, list): params = [params] if params is not None else [] @@ -57,25 +58,29 @@ class SolanaWS: "params": params } - await self.websocket.send(json.dumps(request)) - response = await self.websocket.recv() - response_data = json.loads(response) - - if 'result' in response_data: - return response_data['result'] - elif 'error' in response_data: - logger.error(f"Error in WebSocket RPC call: {response_data['error']}") + await ws.send(json.dumps(request)) + if not doProcessResponse: return None else: - logger.warning(f"Unexpected response: {response_data}") - return None + response = await self.websocket.recv() + response_data = json.loads(response) + + if 'result' in response_data: + return response_data['result'] + elif 'error' in response_data: + logger.error(f"Error in WebSocket RPC call: {response_data['error']}") + return None + else: + logger.warning(f"Unexpected response: {response_data}") + return None async def subscribe(self): params = [ {"mentions": [FOLLOWED_WALLET]}, {"commitment": "confirmed"} ] - result = await self.ws_jsonrpc("logsSubscribe", params) + result = await self.ws_jsonrpc("logsSubscribe", params, doProcessResponse=False) + response = process_messages(self.websocket) if result is not None: self.subscription_id = result logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") @@ -171,7 +176,7 @@ class SolanaAPI: asyncio.create_task(self.on_initial_subscription()) first_subscription = False - await self.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + await self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) @@ -190,7 +195,7 @@ class SolanaAPI: await solana_ws.unsubscribe() if solana_ws.websocket: await solana_ws.close() - await self.send_telegram_message("Reconnecting...") + await self.on_bot_message("Reconnecting...") await asyncio.sleep(5) async def process_transaction(self, signature): @@ -416,7 +421,7 @@ class SolanaDEX: remaining_tokens = [addr for addr in token_addresses if addr not in prices] # Try CoinGecko - coingecko_prices = await get_prices_from_coingecko(remaining_tokens) + coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens) prices.update(coingecko_prices)