From a2b775029ae80baa606b1fdea5fc2b807071f2a5 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 12 Nov 2024 01:50:14 +0200 Subject: [PATCH] replay --- crypto/sol/.env | 2 +- crypto/sol/modules/storage.py | 59 ++++++++++----- crypto/sol/modules/webui.py | 137 +++++++++++++++++++++++++++------- 3 files changed, 152 insertions(+), 46 deletions(-) diff --git a/crypto/sol/.env b/crypto/sol/.env index a7bcbe2..305104b 100644 --- a/crypto/sol/.env +++ b/crypto/sol/.env @@ -23,7 +23,7 @@ TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0" DISPLAY_CURRENCY=USD FOLLOW_AMOUNT=2 -FOLLOW_AMOUNT=percentage +#FOLLOW_AMOUNT=percentage LIQUIDITY_TOKENS=EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v,So11111111111111111111111111111111111111112 diff --git a/crypto/sol/modules/storage.py b/crypto/sol/modules/storage.py index 721736a..0c6d351 100644 --- a/crypto/sol/modules/storage.py +++ b/crypto/sol/modules/storage.py @@ -2,37 +2,56 @@ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -import json +from typing import NamedTuple + +class Transaction(NamedTuple): + wallet: str + transaction_type: str + symbol_in: str + amount_in: float + value_in_usd: float + symbol_out: str + amount_out: float + value_out_usd: float + tx_signature: str + +from enum import Enum from datetime import datetime -import prisma +from enum import Enum +import json from prisma import Prisma +class TransactionStatus(Enum): + PENDING = "PENDING" + SENT = "SENT" + CONFIRMED = "CONFIRMED" + # Initialize the Prisma client prisma_client = Prisma() async def init_db(): await prisma_client.connect() -async def store_transaction(transaction_data): +async def store_transaction(transaction: Transaction): """ - Store a transaction record in the database using a dictionary. + Store a transaction record in the database. """ - default_data = { - 'wallet_id': None, - 'timestamp': datetime.now().isoformat(), - 'type': None, - 'sell_currency': None, - 'sell_amount': 0.0, - 'sell_value': 0.0, - 'buy_currency': None, - 'buy_amount': 0.0, - 'buy_value': 0.0, - 'solana_signature': None, - 'details': json.dumps({}), - 'status': prisma_client.transactionStatus.ORIGINAL - } - default_data.update(transaction_data) - await prisma_client.transaction.create(data=default_data) + await prisma_client.transaction.create( + data={ + 'wallet_id': transaction.wallet, + 'timestamp': datetime.now().isoformat(), + 'type': transaction.transaction_type, + 'sell_currency': transaction.symbol_in, + 'sell_amount': transaction.amount_in, + 'sell_value': transaction.value_in_usd, + 'buy_currency': transaction.symbol_out, + 'buy_amount': transaction.amount_out, + 'buy_value': transaction.value_out_usd, + 'solana_signature': transaction.tx_signature, + 'details': json.dumps({}), + 'status': TransactionStatus.PENDING.value + } + ) async def update_holdings(wallet_id, currency, amount_change): holding = await prisma_client.holding.find_first( diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index d99b13c..3c65f80 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -3,6 +3,7 @@ import sys import asyncio from concurrent.futures import ThreadPoolExecutor +import traceback from flask import Flask, jsonify, request, render_template, redirect, url_for # from flask_oauthlib.client import OAuth from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user @@ -76,15 +77,48 @@ def init_app(tr_handler=None): await utils.telegram_utils.send_telegram_message(notification) # Store the notified transaction in the database - await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'], tx_signature) + original_transaction = storage.Transaction( + wallet=wallet, + transaction_type="SWAP", + symbol_in=tr['symbol_in'], + amount_in=tr['amount_in'], + value_in_usd=tr['value_in_USD'], + symbol_out=tr['symbol_out'], + amount_out=tr['amount_out'], + value_out_usd=tr['value_out_USD'], + tx_signature=tx_signature + ) + await storage.store_transaction(original_transaction) # Attempt to execute the copytrade transaction try: await SolanaAPI.SAPI.follow_move(tr) # Store the successful copytrade transaction - await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) + follow_transaction = storage.Transaction( + wallet=wallet, + transaction_type="SWAP", + symbol_in=tr['symbol_in'], + amount_in=tr['amount_in'], + value_in_usd=tr['value_in_USD'], + symbol_out=tr['symbol_out'], + amount_out=tr['amount_out'], + value_out_usd=tr['value_out_USD'], + tx_signature=tx_signature + ) + await storage.store_transaction(follow_transaction) except Exception as e: # Store the failed copytrade transaction - await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) + failed_transaction = storage.Transaction( + wallet=wallet, + transaction_type="SWAP_FAIL", + symbol_in=tr['symbol_in'], + amount_in=tr['amount_in'], + value_in_usd=tr['value_in_USD'], + symbol_out=tr['symbol_out'], + amount_out=tr['amount_out'], + value_out_usd=tr['value_out_USD'], + tx_signature=tx_signature + ) + await storage.store_transaction(failed_transaction) logging.error(f"Copytrade transaction failed: {e}") # ToDo - probably optimize await SolanaAPI.SAPI.save_token_info() @@ -153,43 +187,65 @@ def init_app(tr_handler=None): token_inputs = swap_event.get('tokenInputs', []) token_outputs = swap_event.get('tokenOutputs', []) - if not token_inputs or not token_outputs: - logging.warning("Missing token inputs or outputs") - return - + usdcMint = LIQUIDITY_TOKENS[0] tr = {} + wallet = data[0]['feePayer'] # Using feePayer as the wallet address + tx_signature = data[0]['signature'] + try: + # Determine transaction type + if token_inputs and token_outputs and LIQUIDITY_TOKENS[0] in [token_inputs[0]["mint"], token_outputs[0]["mint"]]: + if token_inputs[0]["mint"] == usdcMint: + tr["type"] = "BUY" + else: + tr["type"] = "SELL" + else: + tr["type"] = "SWAP" + if not token_inputs or len(token_inputs) == 0: logging.info("Assumed USDC as first token. BUY transaction detected") tr["token_in"] = usdcMint tr["type"] = "BUY" - tr["amount_in"] = await SolanaAPI.DEX.get_token_prices( - token_outputs[0]["mint"], - int(token_outputs[0]["rawTokenAmount"]["tokenAmount"]) - ) + prices = await SolanaAPI.DEX.get_token_prices([token_outputs[0]["mint"]]) + tr["amount_in"] = prices[token_outputs[0]["mint"]] * int(token_outputs[0]["rawTokenAmount"]["tokenAmount"]) / 10** int(token_outputs[0]["rawTokenAmount"]["decimals"]) + else: token_in = token_inputs[0] tr["token_in"] = token_in["mint"] tr["token_in_decimals"] = token_in["rawTokenAmount"]["decimals"] tr["amount_in"] = float(int(token_in["rawTokenAmount"]["tokenAmount"]) / 10**token_in["rawTokenAmount"]["decimals"]) - # 'amount_in': float(token_inputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_inputs[0]['rawTokenAmount']['decimals'], if not token_outputs or len(token_outputs) == 0: logging.info("Assumed USDC as second token. SELL transaction detected") tr["token_out"] = usdcMint tr["type"] = "SELL" - tr["amount_out"] = await SolanaAPI.DEX.get_token_prices( - token_inputs[0]["mint"], - int(token_inputs[0]["rawTokenAmount"]["tokenAmount"]) - ) + prices = await SolanaAPI.DEX.get_token_prices([token_inputs[0]["mint"]]) + tr["amount_out"] = prices[token_inputs[0]["mint"]] * int(token_inputs[0]["rawTokenAmount"]["tokenAmount"]) / 10** int(token_inputs[0]["rawTokenAmount"]["decimals"]) + else: token_out = token_outputs[0] tr["token_out"] = token_out["mint"] tr["token_out_decimals"] = token_out["rawTokenAmount"]["decimals"] tr["amount_out"] = float(int(token_out["rawTokenAmount"]["tokenAmount"]) / 10**token_out["rawTokenAmount"]["decimals"]) - #'amount_out': float(token_outputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_outputs[0]['rawTokenAmount']['decimals'], - + + # Store transaction in database + if tr["type"] in ["BUY", "SELL"]: + is_buy = tr["type"] == "BUY" + + transaction = storage.Transaction( + wallet=wallet, + transaction_type=tr["type"], + symbol_in=tr["token_in"], + amount_in=tr["amount_in"] if is_buy else 0, + value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, + symbol_out=tr["token_out"], + amount_out=tr["amount_out"] if not is_buy else 0, + value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, + tx_signature=tx_signature + ) + await storage.store_transaction(transaction) + if swap_event.get('nativeInput'): # SOL token_in = swap_event.get('nativeInput', []) logger.info(f"Native input (SOL) detected ({token_in["amount"]})") @@ -202,9 +258,7 @@ def init_app(tr_handler=None): except Exception as e: logging.error(f"Error loading transaction token data: {str(e)}") - wallet = data[0]['feePayer'] # Using feePayer as the wallet address - tx_signature = data[0]['signature'] - + # ToDo - probably optimize tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in']) tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out']) @@ -221,7 +275,21 @@ def init_app(tr_handler=None): await utils.telegram_utils.send_telegram_message(notification) # Store the notified transaction in the database - storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) + # storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) + copyTransaction = storage.Transaction( + wallet=wallet, + transaction_type=tr["type"], + symbol_in=tr["token_in"], + amount_in=tr["amount_in"] if is_buy else 0, + value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0, + symbol_out=tr["token_out"], + amount_out=tr["amount_out"] if not is_buy else 0, + value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0, + tx_signature=tx_signature + ) + try: storage.store_transaction(copyTransaction) + except: logging.error(traceback.format_exc()) + # Attempt to execute the copytrade transaction try: # await SolanaAPI.SAPI.follow_move(tr) @@ -242,15 +310,34 @@ def init_app(tr_handler=None): except Exception as e: logging.error(f"Error processing transaction notification: {str(e)}") # Log the full traceback for debugging - import traceback logging.error(traceback.format_exc()) - @app.route('/retry', methods=['GET']) + @app.route('/replay_wh', methods=['POST']) + async def replay_wh(): + try: + data = request.get_json() + filename = data.get('filename') + if not filename: + return jsonify({"error": "Filename not provided"}), 400 + + file_path = os.path.join(SolanaAPI.root_path, 'logs', filename) + if not os.path.exists(file_path): + return jsonify({"error": "File not found"}), 404 + + with open(file_path, 'r') as f: + log_data = json.load(f) + + await process_wh(log_data) + return jsonify({"status": "Replay successful"}), 200 + + except Exception as e: + logging.error(f"Error replaying webhook file: {e}") + return jsonify({"error": "Failed to replay webhook file"}), 500 @app.route('/retry-last-log', methods=['GET']) async def retry_last_log(): wh = request.args.get('wh', 'false').lower() == 'true' - latest_log_file = get_latest_log_file(wh) + latest_log_file = get_latest_log_file(wh) if not latest_log_file: return jsonify({"error": "No log files found"}), 404