From 1b0c72dc08f358a790d690ec90542a7e7d12ec85 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 18 Nov 2024 20:12:58 +0200 Subject: [PATCH] storage commeted --- crypto/sol/.env | 15 +- crypto/sol/modules/SolanaAPI.py | 185 ++++++----- crypto/sol/modules/webui.py | 532 ++++++++++++++++++-------------- 3 files changed, 419 insertions(+), 313 deletions(-) diff --git a/crypto/sol/.env b/crypto/sol/.env index 872d843..1e5dfd2 100644 --- a/crypto/sol/.env +++ b/crypto/sol/.env @@ -23,18 +23,19 @@ TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0" DISPLAY_CURRENCY=USD #FOLLOW_AMOUNT=3 -FOLLOW_AMOUNT=percentage +# proportional, xx%, +FOLLOW_AMOUNT=5% LIQUIDITY_TOKENS=EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v,So11111111111111111111111111111111111111112 PRIORITY=1 # 0-10, 5 = market cap, 10 twice market cap DO_WATCH_WALLET=True # Niki's to Sync: [PROD] -#FOLLOWED_WALLET="3EZkyU9zQRrHPnrNovDiRCA9Yg3wLK35u9cdrcGcszi1" -#YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" -#PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH +FOLLOWED_WALLET="7TS3ATxhEVyah29gU7Z6zwwsNpWLm88ZPBig1dwyMFip" +YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH # Sync to main [DEV] -FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" -YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" -PK=5ccrMf3BFFE1HMsXt17btK1tMSNay7aBoY27saPHrqg2JEjxKBmBbxUABD9Jh7Gisf1bhM51oGzWdyLUgHdrUJPw \ No newline at end of file +#FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +#YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" +#PK=5ccrMf3BFFE1HMsXt17btK1tMSNay7aBoY27saPHrqg2JEjxKBmBbxUABD9Jh7Gisf1bhM51oGzWdyLUgHdrUJPw \ No newline at end of file diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 0be36c7..8b4a95d 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -48,7 +48,7 @@ import random import websockets from typing import Dict, List, Optional import requests -from datetime import datetime +from datetime import datetime, timedelta from solana.rpc.types import TokenAccountOpts, TxOpts from typing import List, Dict, Any, Tuple import traceback @@ -134,6 +134,9 @@ class SolanaWS: elif 'error' in response_data: logger.error(f"Error in WebSocket RPC call: {response_data['error']}") return None + # if result is integer + elif "id" in response_data and int(response_data['id']) == 1: + return int(response_data['result']) else: logger.warning(f"Unexpected response: {response_data}") return None @@ -789,30 +792,36 @@ class SolanaAPI: async def follow_move(self,move): try: - your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) - your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) - if your_balance_info is not None: - # Use the balance - print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") - else: - print(f"No ballance found for {move['symbol_in']}. Skipping move.") - await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") - return - - your_balance = your_balance_info['amount'] - + try: + your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) + your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) + if your_balance_info is not None: + # Use the balance + print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") + # else: + # print(f"No ballance found for {move['symbol_in']}. Skipping move.") + # await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") + # return + + your_balance = your_balance_info['amount'] + + if not your_balance: + msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." + logging.warning(msg) + await telegram_utils.send_telegram_message(msg) + return + + except Exception as e: + logging.error(f"Error fetching your balance: {e}") + if FOLLOW_AMOUNT == 'proportional': + return token_info = DEX.TOKENS_INFO.get(move['token_in']) token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata_symbol(move['token_in']) token_name_out = DEX.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out']) - if not your_balance: - msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." - logging.warning(msg) - await telegram_utils.send_telegram_message(msg) - return - if FOLLOW_AMOUNT == 'percentage': + if FOLLOW_AMOUNT == 'proportional': # Calculate the amount to swap based on the same percentage as the followed move if move.get('percentage_swapped') is None: followed_ballances = await DEX.get_wallet_balances(FOLLOWED_WALLET, doGetTokenName=False) @@ -827,11 +836,20 @@ class SolanaAPI: amount_to_swap = 100 else: amount_to_swap = your_balance * (move['percentage_swapped'] / 100) - elif FOLLOW_AMOUNT == 'exact': - amount_to_swap = move['amount_in'] + # if contains %, then calculate the amount to swap based on the same percentage as the followed move + elif '%' in FOLLOW_AMOUNT: + try: + percentage = float(FOLLOW_AMOUNT.strip('%')) + amount_to_swap = move['amount_in'] * (percentage / 100) + except ValueError: + msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number." + logging.warning(msg) + await telegram_utils.send_telegram_message(msg) + return + else: try: - fixed_amount = float(FOLLOW_AMOUNT) # un USD + fixed_amount = float(FOLLOW_AMOUNT) # in USD fixed_amount_in_token = fixed_amount / move["token_in_price"] amount_to_swap = min(fixed_amount_in_token, your_balance) except ValueError: @@ -840,7 +858,7 @@ class SolanaAPI: await telegram_utils.send_telegram_message(msg) return - amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have + # amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have decimals = token_info.get('decimals') # Convert to lamports @@ -848,26 +866,26 @@ class SolanaAPI: amount_lamports = int(amount_to_swap * 10**decimals) logging.debug(f"Calculated amount in lamports: {amount_lamports}") - if your_balance < amount_to_swap: # should not happen - msg = ( - f"Warning:\n" - f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount_lamports}).\n This will probably fail. But we will try anyway." - ) - logging.warning(msg) - await telegram_utils.send_telegram_message(msg) + # if your_balance < amount_to_swap: # should not happen + # msg = ( + # f"Warning:\n" + # f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount_lamports}).\n This will probably fail. But we will try anyway." + # ) + # logging.warning(msg) + # await telegram_utils.send_telegram_message(msg) try: - try: - notification = ( - f"Initiating move:\n" - f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}" - + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "") - ) - # logging.info(notification) - # error_logger.info(notification) - # await telegram_utils.send_telegram_message(notification) - except Exception as e: - logging.error(f"Error sending notification: {e}") + # try: + # notification = ( + # f"Initiating move:\n" + # f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}" + # + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "") + # ) + # # logging.info(notification) + # # error_logger.info(notification) + # # await telegram_utils.send_telegram_message(notification) + # except Exception as e: + # logging.error(f"Error sending notification: {e}") if self.pk is None: self.pk = await get_pk() @@ -877,6 +895,7 @@ class SolanaAPI: async_client = AsyncClient(SOLANA_WS_URL) jupiter = Jupiter(async_client, private_key) + # https://station.jup.ag/api-v6/post-swap #transaction_data = await jupiter.swap( transaction_data = await self.swap_on_jupiter( @@ -889,6 +908,7 @@ class SolanaAPI: logging.info(f"Initiating move. Transaction data:\n {transaction_data}") raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) + # working - no priority fee signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message)) signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) @@ -896,8 +916,9 @@ class SolanaAPI: opts = TxOpts( skip_preflight=False, preflight_commitment=Processed, + max_retries=5 # Add retries for network issues ) - + # send the transaction result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) @@ -913,7 +934,10 @@ class SolanaAPI: logging.warning(f"Failed to get transaction details for {transaction_id}.\n Probably transaction failed. Retrying again...") await asyncio.sleep(3) except Exception as e: - decoded_data = ''# base64.b64decode(transaction_data) + # decode transacion data (try base58/64) + # decoded_data = base58.b58decode(transaction_data).decode('utf-8') + # decoded_data = base64.b64decode(transaction_data).decode('utf-8') + decoded_data = None error_message = f"Move Failed:\n{str(e)}\n{decoded_data}\n{move}" logging.error(error_message) # log the errors to /logs/errors.log @@ -1045,7 +1069,8 @@ class SolanaDEX: token_info = self.TOKENS_INFO.setdefault(token, {}) if 'symbol' not in token_info: token_info['symbol'] = await SAPI.get_token_metadata_symbol(token) - token_info['price'] = price + token_info['price'] = price + token_info['lastUpdated'] = datetime.now().isoformat() return prices @@ -1186,6 +1211,10 @@ class SolanaDEX: asyncio.set_event_loop(loop) logging.info(f"Getting balances for wallet: {wallet_address}") response = None + # if ballances got in last 2 minutes, return them + if "lastUpdated" in self.TOKENS_INFO and datetime.fromisoformat(self.TOKENS_INFO["lastUpdated"]) > datetime.now() - timedelta(minutes=2): + logging.info(f"Using cached balances for wallet: {wallet_address}") + return self.TOKENS_INFO try: response = await self.solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), @@ -1204,46 +1233,46 @@ class SolanaDEX: if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: mint = info['mint'] decimals = int(info['tokenAmount']['decimals']) - amount = int(info['tokenAmount']['amount']) - amount = int(amount) - if amount > 1: - amount = float(amount / 10**decimals) - if mint in self.TOKENS_INFO: - token_name = self.TOKENS_INFO[mint].get('symbol') - elif doGetTokenName: - token_name = await SAPI.get_token_metadata_symbol(mint) or 'N/A' - self.TOKENS_INFO[mint] = {'symbol': token_name} - await asyncio.sleep(2) - - self.TOKENS_INFO[mint]['holdedAmount'] = round(amount, decimals) - self.TOKENS_INFO[mint]['decimals'] = decimals - balances[mint] = { - 'name': token_name or 'N/A', - 'address': mint, - 'amount': amount, - 'decimals': decimals - } - try: - logging.debug(f"Account balance for {token_name} ({mint}): {amount}") - except Exception as e: - logging.error(f"Error logging account balance: {str(e)}") + amount = float(info['tokenAmount']['uiAmountString']) + # amount = float(info['tokenAmount']['amount']) + # amount = float(amount / 10**decimals) + token_name = None + if mint in self.TOKENS_INFO: + token_name = self.TOKENS_INFO[mint].get('symbol') + elif doGetTokenName: + token_name = await SAPI.get_token_metadata_symbol(mint) + await asyncio.sleep(2) + balances[mint] = { + 'name': token_name or 'N/A', + 'address': mint, + 'amount': amount, + 'decimals': decimals + } + self.TOKENS_INFO[mint] = {'symbol': token_name} + self.TOKENS_INFO[mint] = self.TOKENS_INFO[mint].update(balances[mint]) + + try: + logging.debug(f"Account balance for {token_name or "N/A"} ({mint}): {amount}") + except Exception as e: + logging.error(f"Error logging account balance: {str(e)}") else: logging.warning(f"Unexpected data format for account: {account}") except Exception as e: logging.error(f"Error parsing account data: {str(e)}") - - sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address)) - if sol_balance.value is not None: - balances['SOL'] = { - 'name': 'SOL', - 'address': 'SOL', - 'amount': sol_balance.value / 1e9 - } - else: - logging.warning(f"SOL balance response missing for wallet: {wallet_address}") + self.TOKENS_INFO["lastUpdated"] = datetime.now().isoformat() + + # sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address)) + # if sol_balance.value is not None: + # balances['SOL'] = { + # 'name': 'SOL', + # 'address': 'SOL', + # 'amount': sol_balance.value / 1e9 + # } + # else: + # logging.warning(f"SOL balance response missing for wallet: {wallet_address}") except Exception as e: - logging.error(f"Error getting wallet balances: {str(e)}") + logging.error(f"Error getting wallet balances: {str(e)} {e.error_msg}") if response and response.value: logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") else: diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index 3449343..20aca8c 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -5,15 +5,24 @@ from concurrent.futures import ThreadPoolExecutor import traceback from flask import Flask, jsonify, request, render_template, redirect, url_for + # from flask_oauthlib.client import OAuth -from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user +from flask_login import ( + LoginManager, + UserMixin, + login_user, + login_required, + logout_user, + current_user, +) import secrets import json + # from crypto.sol.config import LIQUIDITY_TOKENS from config import LIQUIDITY_TOKENS, YOUR_WALLET from modules import storage, utils, SolanaAPI -from modules.utils import async_safe_call, decode_instruction_data +from modules.utils import async_safe_call, decode_instruction_data from modules.storage import Storage import os import logging @@ -21,16 +30,19 @@ from datetime import datetime on_transaction = None + def init_app(tr_handler=None): global on_transaction on_transaction = tr_handler - app = Flask(__name__, template_folder='../templates', static_folder='../static') - + app = Flask(__name__, template_folder="../templates", static_folder="../static") + app.secret_key = secrets.token_hex(16) - executor = ThreadPoolExecutor(max_workers=10) # Adjust the number of workers as needed + executor = ThreadPoolExecutor( + max_workers=10 + ) # Adjust the number of workers as needed login_manager = LoginManager(app) - login_manager.login_view = 'login' - + login_manager.login_view = "login" + storage = Storage() # Ensure database connection @@ -55,17 +67,18 @@ def init_app(tr_handler=None): # authorize_url='https://accounts.google.com/o/oauth2/auth', # ) - login_manager = LoginManager() login_manager.init_app(app) - + logger = logging.getLogger(__name__) - + # API - @app.route('/tr//', methods=['GET', 'POST']) + @app.route("/tr//", methods=["GET", "POST"]) async def transaction_notified(wallet, tx_signature): try: - logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") + logger.info( + f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}" + ) request_data = request.get_json() if request.is_json else None if not request_data: # Process the transaction @@ -73,73 +86,76 @@ def init_app(tr_handler=None): tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, []) else: tr = request_data - - - # ToDo - probably optimize - tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in']) - tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out']) - prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']]) - tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in'] - tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out'] - - notification = ( - f"Got TXN notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n" + + # ToDo - probably optimize + tr["symbol_in"] = await SolanaAPI.SAPI.get_token_metadata_symbol( + tr["token_in"] ) + tr["symbol_out"] = await SolanaAPI.SAPI.get_token_metadata_symbol( + tr["token_out"] + ) + prices = await SolanaAPI.DEX.get_token_prices( + [tr["token_in"], tr["token_out"]] + ) + tr["value_in_USD"] = prices.get(tr["token_in"], 0) * tr["amount_in"] + tr["value_out_USD"] = prices.get(tr["token_out"], 0) * tr["amount_out"] + + notification = f"Got TXN notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n" logging.info(notification) await utils.telegram_utils.send_telegram_message(notification) - + # Store the notified transaction in the database - original_transaction = storage.Transaction( - wallet=wallet, - transaction_type="SWAP", - symbol_in=tr['symbol_in'], - amount_in=tr['amount_in'], - value_in_usd=tr['value_in_USD'], - symbol_out=tr['symbol_out'], - amount_out=tr['amount_out'], - value_out_usd=tr['value_out_USD'], - tx_signature=tx_signature - ) - await storage.store_transaction(original_transaction) - # Attempt to execute the copytrade transaction + # original_transaction = storage.Transaction( + # wallet=wallet, + # transaction_type="SWAP", + # symbol_in=tr['symbol_in'], + # amount_in=tr['amount_in'], + # value_in_usd=tr['value_in_USD'], + # symbol_out=tr['symbol_out'], + # amount_out=tr['amount_out'], + # value_out_usd=tr['value_out_USD'], + # tx_signature=tx_signature + # ) + # await storage.store_transaction(original_transaction) + # # Attempt to execute the copytrade transaction try: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction - follow_transaction = storage.Transaction( - wallet=wallet, - transaction_type="SWAP", - symbol_in=tr['symbol_in'], - amount_in=tr['amount_in'], - value_in_usd=tr['value_in_USD'], - symbol_out=tr['symbol_out'], - amount_out=tr['amount_out'], - value_out_usd=tr['value_out_USD'], - tx_signature=tx_signature - ) - await storage.store_transaction(follow_transaction) + # follow_transaction = storage.Transaction( + # wallet=wallet, + # transaction_type="SWAP", + # symbol_in=tr['symbol_in'], + # amount_in=tr['amount_in'], + # value_in_usd=tr['value_in_USD'], + # symbol_out=tr['symbol_out'], + # amount_out=tr['amount_out'], + # value_out_usd=tr['value_out_USD'], + # tx_signature=tx_signature + # ) + # await storage.store_transaction(follow_transaction) except Exception as e: - # Store the failed copytrade transaction - failed_transaction = storage.Transaction( - wallet=wallet, - transaction_type="SWAP_FAIL", - symbol_in=tr['symbol_in'], - amount_in=tr['amount_in'], - value_in_usd=tr['value_in_USD'], - symbol_out=tr['symbol_out'], - amount_out=tr['amount_out'], - value_out_usd=tr['value_out_USD'], - tx_signature=tx_signature - ) - await storage.store_transaction(failed_transaction) + # # Store the failed copytrade transaction + # failed_transaction = storage.Transaction( + # wallet=wallet, + # transaction_type="SWAP_FAIL", + # symbol_in=tr['symbol_in'], + # amount_in=tr['amount_in'], + # value_in_usd=tr['value_in_USD'], + # symbol_out=tr['symbol_out'], + # amount_out=tr['amount_out'], + # value_out_usd=tr['value_out_USD'], + # tx_signature=tx_signature + # ) + # await storage.store_transaction(failed_transaction) logging.error(f"Copytrade transaction failed: {e}") - # ToDo - probably optimize + # ToDo - probably optimize await SolanaAPI.SAPI.save_token_info() return jsonify(tr), 200 except Exception as e: logging.error(f"Error processing transaction: {e}") return jsonify({"error": "Failed to process transaction"}), 500 - @app.route('/wh', methods=['POST']) + @app.route("/wh", methods=["POST"]) async def webhook(): try: current_time = datetime.now().strftime("%Y%m%d-%H%M%S") @@ -152,29 +168,30 @@ def init_app(tr_handler=None): else: logger.info(f"Webhook data: {request_data}") # save dump to /cache/last-webhook-{datetime}.json - - with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f: + + with open( + os.path.join(SolanaAPI.root_path, "logs", f"wh_{current_time}.json"), + "w", + ) as f: json.dump(request_data, f) - - + if "meta" in request_data[0]: meta = request_data[0]["meta"] - + # Parse inner instructions for inner_ix in meta.get("innerInstructions", []): for instruction in inner_ix.get("instructions", []): decoded = decode_instruction_data(instruction["data"]) logger.info(f"Instruction data decoded: {decoded}") - + # Example of pattern matching for specific instruction types - if decoded["instruction_type"] == 1: # Example: swap instruction + if ( + decoded["instruction_type"] == 1 + ): # Example: swap instruction # Parse parameters based on program type # Different DEXes will have different parameter layouts pass - - - - + # await process_wh(request_data) # don't wait for the process to finish executor.submit(asyncio.run, process_wh(request_data)) @@ -182,33 +199,37 @@ def init_app(tr_handler=None): except Exception as e: logging.error(f"Error processing webhook: {e}") return jsonify({"error": "Failed to process webhook"}), 500 - - # Flask route to retry processing the last log - - async def process_wh(data): + + # Flask route to retry processing the last log + + async def process_wh(data): global on_transaction - + try: - if data[0].get('type') == "SWAP": - swap_event = data[0]['events'].get('swap') + if data[0].get("type") == "SWAP": + swap_event = data[0]["events"].get("swap") if not swap_event: logging.warning("No swap event found in data") return - + # Extract token input details from the first token input - token_inputs = swap_event.get('tokenInputs', []) - token_outputs = swap_event.get('tokenOutputs', []) - + token_inputs = swap_event.get("tokenInputs", []) + token_outputs = swap_event.get("tokenOutputs", []) + tr = {} - wallet = data[0]['feePayer'] # Using feePayer as the wallet address - tx_signature = data[0]['signature'] + wallet = data[0]["feePayer"] # Using feePayer as the wallet address + tx_signature = data[0]["signature"] usdcMint = LIQUIDITY_TOKENS[0] solMint = LIQUIDITY_TOKENS[1] - - + try: # Determine transaction type - if token_inputs and token_outputs and LIQUIDITY_TOKENS[0] in [token_inputs[0]["mint"], token_outputs[0]["mint"]]: + if ( + token_inputs + and token_outputs + and LIQUIDITY_TOKENS[0] + in [token_inputs[0]["mint"], token_outputs[0]["mint"]] + ): if token_inputs[0]["mint"] == usdcMint: tr["type"] = "BUY" else: @@ -216,107 +237,137 @@ def init_app(tr_handler=None): else: tr["type"] = "SWAP" - - if swap_event.get('nativeInput', None): + if swap_event.get("nativeInput", None): tr["token_in"] = solMint - tr["amount_in"] = int(swap_event.get('nativeInput')["amount"])/ 10**6 + tr["amount_in"] = ( + int(swap_event.get("nativeInput")["amount"]) / 10**9 + ) tr["type"] = "BUY" - tr["token_in_decimals"] = 6 - - if swap_event.get('nativeOutput', None): + tr["token_in_decimals"] = 9 + + if swap_event.get("nativeOutput", None): tr["token_out"] = solMint - tr["amount_out"] = int(swap_event.get('nativeOutput')["amount"]) / 10**6 + tr["amount_out"] = ( + int(swap_event.get("nativeOutput")["amount"]) / 10**9 + ) tr["type"] = "SELL" - tr["token_out_decimals"] = 6 + tr["token_out_decimals"] = 9 - if not token_inputs or len(token_inputs) == 0: - logging.info("Assumed USDC as first token. BUY transaction detected") - tr["token_in"] = usdcMint - tr["type"] = "BUY" - tr["amount_in"] = await calculate_price_amount(token_outputs[0]) - else: - token_in = token_inputs[0] - tr["token_in"] = token_in["mint"] - tr["token_in_decimals"] = get_decimals(token_in) - tr["amount_in"] = calculate_amount(token_in) + # if we don't have token_in yet + if "token_in" not in tr: + if not token_inputs or len(token_inputs) == 0: + logging.info( + "Assumed USDC as first token. BUY transaction detected" + ) + tr["token_in"] = usdcMint + tr["type"] = "BUY" + tr["amount_in"] = await calculate_price_amount( + token_outputs[0] + ) + else: + token_in = token_inputs[0] + tr["token_in"] = token_in["mint"] + tr["token_in_decimals"] = get_decimals(token_in) + tr["amount_in"] = calculate_amount(token_in) + + # if we don't have token_out yet + if "token_out" not in tr: + if not token_outputs or len(token_outputs) == 0: + logging.info( + "Assumed USDC as second token. SELL transaction detected" + ) + tr["token_out"] = usdcMint + tr["type"] = "SELL" + tr["amount_out"] = await calculate_price_amount( + token_inputs[0] + ) + else: + token_out = token_outputs[0] + tr["token_out"] = token_out["mint"] + tr["token_out_decimals"] = get_decimals(token_out) + tr["amount_out"] = calculate_amount(token_out) - if not token_outputs or len(token_outputs) == 0: - logging.info("Assumed USDC as second token. SELL transaction detected") - tr["token_out"] = usdcMint - tr["type"] = "SELL" - tr["amount_out"] = await calculate_price_amount(token_inputs[0]) - else: - token_out = token_outputs[0] - tr["token_out"] = token_out["mint"] - tr["token_out_decimals"] = get_decimals(token_out) - tr["amount_out"] = calculate_amount(token_out) - # Store transaction in database if tr["type"] in ["BUY", "SELL"]: is_buy = tr["type"] == "BUY" - - transaction = storage.Transaction( - wallet=wallet, - transaction_type=tr["type"], - symbol_in=tr["token_in"], - amount_in=tr["amount_in"] if is_buy else 0, - value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, - symbol_out=tr["token_out"], - amount_out=tr["amount_out"] if not is_buy else 0, - value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, - tx_signature=tx_signature + + # transaction = storage.Transaction( + # wallet=wallet, + # transaction_type=tr["type"], + # symbol_in=tr["token_in"], + # amount_in=tr["amount_in"] if is_buy else 0, + # value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, + # symbol_out=tr["token_out"], + # amount_out=tr["amount_out"] if not is_buy else 0, + # value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, + # tx_signature=tx_signature + # ) + # await storage.store_transaction(transaction) + + if swap_event.get("nativeInput"): # SOL + token_in = swap_event.get("nativeInput", []) + logger.info( + f"Native input (SOL) detected ({token_in["amount"]})" + ) + + if ( + not tr["token_in"] + or not tr["token_out"] + or tr["amount_in"] == 0 + or tr["amount_out"] == 0 + ): + logging.warning( + "Incomplete swap details found in logs. Getting details from transaction" + ) + tx_signature = data[0].get("signature") + logs = data[0].get("logs", []) + tr = await SolanaAPI.SAPI.get_transaction_details_info( + tx_signature, logs ) - await storage.store_transaction(transaction) - - if swap_event.get('nativeInput'): # SOL - token_in = swap_event.get('nativeInput', []) - logger.info(f"Native input (SOL) detected ({token_in["amount"]})") - - if not tr["token_in"] or not tr["token_out"] or tr["amount_in"] == 0 or tr["amount_out"] == 0: - logging.warning("Incomplete swap details found in logs. Getting details from transaction") - tx_signature = data[0].get('signature') - logs = data[0].get('logs', []) - tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, logs) except Exception as e: logging.error(f"Error loading transaction token data: {str(e)}") - - # ToDo - probably optimize - tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in']) - tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out']) - prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']]) - tr["token_in_price"] = prices.get(tr['token_in'], 0) - tr["token_out_price"] = prices.get(tr['token_out'], 0) - tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in'] - tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out'] - - notification = ( - f"Got WH notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['amount_out']} {tr['symbol_out']} ${tr['value_out_USD']}\n" + # ToDo - probably optimize + tr["symbol_in"] = await SolanaAPI.SAPI.get_token_metadata_symbol( + tr["token_in"] ) - logging.info(notification) - await utils.telegram_utils.send_telegram_message(notification) - + tr["symbol_out"] = await SolanaAPI.SAPI.get_token_metadata_symbol( + tr["token_out"] + ) + # ToDo - optimize + # prices = await SolanaAPI.DEX.get_token_prices( + # [tr["token_in"], tr["token_out"]] + # ) + # tr["token_in_price"] = prices.get(tr["token_in"], 0) + # tr["token_out_price"] = prices.get(tr["token_out"], 0) + # tr["value_in_USD"] = prices.get(tr["token_in"], 0) * tr["amount_in"] + # tr["value_out_USD"] = prices.get(tr["token_out"], 0) * tr["amount_out"] + + # notification = f"Got WH notification:: {tr['amount_in']} {tr['symbol_in'] or tr["token_in"]} swapped for {tr['amount_out']} {tr['symbol_out']} ${tr['value_out_USD']}\n" + # logging.info(notification) + # await utils.telegram_utils.send_telegram_message(notification) + # Store the notified transaction in the database # storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) - copyTransaction = storage.Transaction( - wallet=wallet, - transaction_type=tr["type"], - symbol_in=tr["token_in"], - amount_in=tr["amount_in"] if is_buy else 0, - value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, - symbol_out=tr["token_out"], - amount_out=tr["amount_out"] if not is_buy else 0, - value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, - tx_signature=tx_signature - ) - try: await storage.store_transaction(copyTransaction) - except: logging.error(traceback.format_exc()) - + # copyTransaction = storage.Transaction( + # wallet=wallet, + # transaction_type=tr["type"], + # symbol_in=tr["token_in"], + # amount_in=tr["amount_in"] if is_buy else 0, + # value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, + # symbol_out=tr["token_out"], + # amount_out=tr["amount_out"] if not is_buy else 0, + # value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, + # tx_signature=tx_signature + # ) + # try: await storage.store_transaction(copyTransaction) + # except: logging.error(traceback.format_exc()) + # Attempt to execute the copytrade transaction try: # await SolanaAPI.SAPI.follow_move(tr) - if on_transaction: - await async_safe_call( on_transaction, tr) + if on_transaction: + await async_safe_call(on_transaction, tr) else: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction @@ -325,7 +376,7 @@ def init_app(tr_handler=None): # Store the failed copytrade transaction # await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) logging.error(f"Copytrade transaction failed: {e}") - # ToDo - probably optimize + # ToDo - probably optimize await SolanaAPI.DEX.save_token_info() else: logger.info("wh transaction is not a swap. skipping...") @@ -335,11 +386,15 @@ def init_app(tr_handler=None): logging.error(traceback.format_exc()) def get_decimals(token_data): - return token_data["rawTokenAmount"].get("decimals") or token_data["rawTokenAmount"].get("decimalFs", 0) + return token_data["rawTokenAmount"].get("decimals") or token_data[ + "rawTokenAmount" + ].get("decimalFs", 0) + def calculate_amount(token_data): decimals = get_decimals(token_data) token_amount = int(token_data["rawTokenAmount"]["tokenAmount"]) return float(token_amount / 10**decimals) + async def calculate_price_amount(token_data, prices=None): if not prices: prices = await SolanaAPI.DEX.get_token_prices([token_data["mint"]]) @@ -347,19 +402,19 @@ def init_app(tr_handler=None): token_amount = int(token_data["rawTokenAmount"]["tokenAmount"]) return prices[token_data["mint"]] * token_amount / 10**decimals - @app.route('/replay_wh', methods=['POST']) + @app.route("/replay_wh", methods=["POST"]) async def replay_wh(): try: data = request.get_json() - filename = data.get('filename') + filename = data.get("filename") if not filename: return jsonify({"error": "Filename not provided"}), 400 - file_path = os.path.join(SolanaAPI.root_path, 'logs', filename) + file_path = os.path.join(SolanaAPI.root_path, "logs", filename) if not os.path.exists(file_path): return jsonify({"error": "File not found"}), 404 - with open(file_path, 'r') as f: + with open(file_path, "r") as f: log_data = json.load(f) await process_wh(log_data) @@ -368,42 +423,45 @@ def init_app(tr_handler=None): except Exception as e: logging.error(f"Error replaying webhook file: {e}") return jsonify({"error": "Failed to replay webhook file"}), 500 - @app.route('/retry-last-log', methods=['GET']) + + @app.route("/retry-last-log", methods=["GET"]) async def retry_last_log(): - wh = request.args.get('wh', 'false').lower() == 'true' - - latest_log_file = get_latest_log_file(wh) + wh = request.args.get("wh", "false").lower() == "true" + + latest_log_file = get_latest_log_file(wh) if not latest_log_file: return jsonify({"error": "No log files found"}), 404 try: utils.log.info(f"Processing latest log file: {latest_log_file}") - with open(latest_log_file, 'r') as f: + with open(latest_log_file, "r") as f: log = json.load(f) if wh: result = await process_wh(log) else: result = await SolanaAPI.process_log(log) - - return jsonify({ - "file": latest_log_file, - "status": "Log dump processed successfully", - "result": result - }), 200 + + return ( + jsonify( + { + "file": latest_log_file, + "status": "Log dump processed successfully", + "result": result, + } + ), + 200, + ) except Exception as e: utils.log.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 - - - - # # # # + # # # # # AUTHENTICATION # # # # - - @app.route('/login/google/authorized') + + @app.route("/login/google/authorized") def authorized(): # resp = google.authorized_response() # if resp is None or resp.get('access_token') is None: @@ -415,9 +473,8 @@ def init_app(tr_handler=None): # user_info = google.get('userinfo') # user = storage.get_or_create_user(user_info.data['email'], user_info.data['id']) # login_user(user) - return redirect(url_for('index')) - - + return redirect(url_for("index")) + class User(UserMixin): def __init__(self, id, username, email): self.id = id @@ -428,88 +485,107 @@ def init_app(tr_handler=None): def load_user(user_id): user_data = storage.get_user_by_id(user_id) if user_data: - return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) + return User( + id=user_data["id"], + username=user_data["username"], + email=user_data["email"], + ) return None - @app.route('/') + @app.route("/") def index(): - return render_template('index.html') + return render_template("index.html") @login_manager.unauthorized_handler def unauthorized(): - return redirect('/login?next=' + request.path) + return redirect("/login?next=" + request.path) # return jsonify({'error': 'Unauthorized'}), 401 - @app.route('/login', methods=['GET', 'POST']) + @app.route("/login", methods=["GET", "POST"]) def login(): - if request.method == 'POST': - username = request.form.get('username') - password = request.form.get('password') + if request.method == "POST": + username = request.form.get("username") + password = request.form.get("password") user = storage.authenticate_user(username, password) if user: - login_user(User(id=user['id'], username=user['username'], email=user['email'])) - return redirect(url_for('dashboard')) + login_user( + User(id=user["id"], username=user["username"], email=user["email"]) + ) + return redirect(url_for("dashboard")) else: - return render_template('login.html', error='Invalid credentials') - elif request.args.get('google'): + return render_template("login.html", error="Invalid credentials") + elif request.args.get("google"): # Uncomment the following line if Google OAuth is set up # return google.authorize(callback=url_for('authorized', _external=True)) - return render_template('login.html', error='Google OAuth not configured') - return render_template('login.html') + return render_template("login.html", error="Google OAuth not configured") + return render_template("login.html") - @app.route('/logout') + @app.route("/logout") @login_required def logout(): logout_user() - return redirect(url_for('index')) + return redirect(url_for("index")) - @app.route('/dashboard') + @app.route("/dashboard") @login_required def dashboard(): - return render_template('dashboard.html') + return render_template("dashboard.html") - @app.route('/generate_api_key', methods=['POST']) + @app.route("/generate_api_key", methods=["POST"]) @login_required def generate_api_key(): api_key = secrets.token_urlsafe(32) storage.store_api_key(current_user.id, api_key) - return jsonify({'api_key': api_key}) + return jsonify({"api_key": api_key}) - @app.route('/wallet//transactions', methods=['GET']) + @app.route("/wallet//transactions", methods=["GET"]) @login_required @login_required def get_transactions(wallet_id): transactions = storage.get_transactions(wallet_id) return jsonify(transactions) - @app.route('/wallet//holdings', methods=['GET']) + @app.route("/wallet//holdings", methods=["GET"]) @login_required @login_required def get_holdings(wallet_id): holdings = storage.get_holdings(wallet_id) return jsonify(holdings) - - return app + + return app + def teardown_app(): # Close the database connection storage.disconnect() + # Function to find the latest log file -def get_latest_log_file(wh:bool): - log_dir = os.path.join(SolanaAPI.root_path, 'logs') +def get_latest_log_file(wh: bool): + log_dir = os.path.join(SolanaAPI.root_path, "logs") try: # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] # filter files mask log_20241005_004103_143116.json if wh: - files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('wh_')] + files = [ + f + for f in os.listdir(log_dir) + if os.path.isfile(os.path.join(log_dir, f)) and f.startswith("wh_") + ] else: - files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] + files = [ + f + for f in os.listdir(log_dir) + if os.path.isfile(os.path.join(log_dir, f)) and f.startswith("log_") + ] - latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))) + latest_file = max( + files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x)) + ) return os.path.join(log_dir, latest_file) except Exception as e: utils.log.error(f"Error fetching latest log file: {e}") return None + export = init_app, teardown_app