This commit is contained in:
Dobromir Popov 2024-11-12 01:50:14 +02:00
parent 8c75d1b650
commit a2b775029a
3 changed files with 152 additions and 46 deletions

View File

@ -23,7 +23,7 @@ TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0"
DISPLAY_CURRENCY=USD
FOLLOW_AMOUNT=2
FOLLOW_AMOUNT=percentage
#FOLLOW_AMOUNT=percentage
LIQUIDITY_TOKENS=EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v,So11111111111111111111111111111111111111112

View File

@ -2,37 +2,56 @@ import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json
from typing import NamedTuple
class Transaction(NamedTuple):
wallet: str
transaction_type: str
symbol_in: str
amount_in: float
value_in_usd: float
symbol_out: str
amount_out: float
value_out_usd: float
tx_signature: str
from enum import Enum
from datetime import datetime
import prisma
from enum import Enum
import json
from prisma import Prisma
class TransactionStatus(Enum):
PENDING = "PENDING"
SENT = "SENT"
CONFIRMED = "CONFIRMED"
# Initialize the Prisma client
prisma_client = Prisma()
async def init_db():
await prisma_client.connect()
async def store_transaction(transaction_data):
async def store_transaction(transaction: Transaction):
"""
Store a transaction record in the database using a dictionary.
Store a transaction record in the database.
"""
default_data = {
'wallet_id': None,
'timestamp': datetime.now().isoformat(),
'type': None,
'sell_currency': None,
'sell_amount': 0.0,
'sell_value': 0.0,
'buy_currency': None,
'buy_amount': 0.0,
'buy_value': 0.0,
'solana_signature': None,
'details': json.dumps({}),
'status': prisma_client.transactionStatus.ORIGINAL
}
default_data.update(transaction_data)
await prisma_client.transaction.create(data=default_data)
await prisma_client.transaction.create(
data={
'wallet_id': transaction.wallet,
'timestamp': datetime.now().isoformat(),
'type': transaction.transaction_type,
'sell_currency': transaction.symbol_in,
'sell_amount': transaction.amount_in,
'sell_value': transaction.value_in_usd,
'buy_currency': transaction.symbol_out,
'buy_amount': transaction.amount_out,
'buy_value': transaction.value_out_usd,
'solana_signature': transaction.tx_signature,
'details': json.dumps({}),
'status': TransactionStatus.PENDING.value
}
)
async def update_holdings(wallet_id, currency, amount_change):
holding = await prisma_client.holding.find_first(

View File

@ -3,6 +3,7 @@ import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor
import traceback
from flask import Flask, jsonify, request, render_template, redirect, url_for
# from flask_oauthlib.client import OAuth
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
@ -76,15 +77,48 @@ def init_app(tr_handler=None):
await utils.telegram_utils.send_telegram_message(notification)
# Store the notified transaction in the database
await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'], tx_signature)
original_transaction = storage.Transaction(
wallet=wallet,
transaction_type="SWAP",
symbol_in=tr['symbol_in'],
amount_in=tr['amount_in'],
value_in_usd=tr['value_in_USD'],
symbol_out=tr['symbol_out'],
amount_out=tr['amount_out'],
value_out_usd=tr['value_out_USD'],
tx_signature=tx_signature
)
await storage.store_transaction(original_transaction)
# Attempt to execute the copytrade transaction
try:
await SolanaAPI.SAPI.follow_move(tr)
# Store the successful copytrade transaction
await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
follow_transaction = storage.Transaction(
wallet=wallet,
transaction_type="SWAP",
symbol_in=tr['symbol_in'],
amount_in=tr['amount_in'],
value_in_usd=tr['value_in_USD'],
symbol_out=tr['symbol_out'],
amount_out=tr['amount_out'],
value_out_usd=tr['value_out_USD'],
tx_signature=tx_signature
)
await storage.store_transaction(follow_transaction)
except Exception as e:
# Store the failed copytrade transaction
await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
failed_transaction = storage.Transaction(
wallet=wallet,
transaction_type="SWAP_FAIL",
symbol_in=tr['symbol_in'],
amount_in=tr['amount_in'],
value_in_usd=tr['value_in_USD'],
symbol_out=tr['symbol_out'],
amount_out=tr['amount_out'],
value_out_usd=tr['value_out_USD'],
tx_signature=tx_signature
)
await storage.store_transaction(failed_transaction)
logging.error(f"Copytrade transaction failed: {e}")
# ToDo - probably optimize
await SolanaAPI.SAPI.save_token_info()
@ -153,43 +187,65 @@ def init_app(tr_handler=None):
token_inputs = swap_event.get('tokenInputs', [])
token_outputs = swap_event.get('tokenOutputs', [])
if not token_inputs or not token_outputs:
logging.warning("Missing token inputs or outputs")
return
usdcMint = LIQUIDITY_TOKENS[0]
tr = {}
wallet = data[0]['feePayer'] # Using feePayer as the wallet address
tx_signature = data[0]['signature']
try:
# Determine transaction type
if token_inputs and token_outputs and LIQUIDITY_TOKENS[0] in [token_inputs[0]["mint"], token_outputs[0]["mint"]]:
if token_inputs[0]["mint"] == usdcMint:
tr["type"] = "BUY"
else:
tr["type"] = "SELL"
else:
tr["type"] = "SWAP"
if not token_inputs or len(token_inputs) == 0:
logging.info("Assumed USDC as first token. BUY transaction detected")
tr["token_in"] = usdcMint
tr["type"] = "BUY"
tr["amount_in"] = await SolanaAPI.DEX.get_token_prices(
token_outputs[0]["mint"],
int(token_outputs[0]["rawTokenAmount"]["tokenAmount"])
)
prices = await SolanaAPI.DEX.get_token_prices([token_outputs[0]["mint"]])
tr["amount_in"] = prices[token_outputs[0]["mint"]] * int(token_outputs[0]["rawTokenAmount"]["tokenAmount"]) / 10** int(token_outputs[0]["rawTokenAmount"]["decimals"])
else:
token_in = token_inputs[0]
tr["token_in"] = token_in["mint"]
tr["token_in_decimals"] = token_in["rawTokenAmount"]["decimals"]
tr["amount_in"] = float(int(token_in["rawTokenAmount"]["tokenAmount"]) / 10**token_in["rawTokenAmount"]["decimals"])
# 'amount_in': float(token_inputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_inputs[0]['rawTokenAmount']['decimals'],
if not token_outputs or len(token_outputs) == 0:
logging.info("Assumed USDC as second token. SELL transaction detected")
tr["token_out"] = usdcMint
tr["type"] = "SELL"
tr["amount_out"] = await SolanaAPI.DEX.get_token_prices(
token_inputs[0]["mint"],
int(token_inputs[0]["rawTokenAmount"]["tokenAmount"])
)
prices = await SolanaAPI.DEX.get_token_prices([token_inputs[0]["mint"]])
tr["amount_out"] = prices[token_inputs[0]["mint"]] * int(token_inputs[0]["rawTokenAmount"]["tokenAmount"]) / 10** int(token_inputs[0]["rawTokenAmount"]["decimals"])
else:
token_out = token_outputs[0]
tr["token_out"] = token_out["mint"]
tr["token_out_decimals"] = token_out["rawTokenAmount"]["decimals"]
tr["amount_out"] = float(int(token_out["rawTokenAmount"]["tokenAmount"]) / 10**token_out["rawTokenAmount"]["decimals"])
#'amount_out': float(token_outputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_outputs[0]['rawTokenAmount']['decimals'],
# Store transaction in database
if tr["type"] in ["BUY", "SELL"]:
is_buy = tr["type"] == "BUY"
transaction = storage.Transaction(
wallet=wallet,
transaction_type=tr["type"],
symbol_in=tr["token_in"],
amount_in=tr["amount_in"] if is_buy else 0,
value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
symbol_out=tr["token_out"],
amount_out=tr["amount_out"] if not is_buy else 0,
value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
tx_signature=tx_signature
)
await storage.store_transaction(transaction)
if swap_event.get('nativeInput'): # SOL
token_in = swap_event.get('nativeInput', [])
logger.info(f"Native input (SOL) detected ({token_in["amount"]})")
@ -202,9 +258,7 @@ def init_app(tr_handler=None):
except Exception as e:
logging.error(f"Error loading transaction token data: {str(e)}")
wallet = data[0]['feePayer'] # Using feePayer as the wallet address
tx_signature = data[0]['signature']
# ToDo - probably optimize
tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in'])
tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out'])
@ -221,7 +275,21 @@ def init_app(tr_handler=None):
await utils.telegram_utils.send_telegram_message(notification)
# Store the notified transaction in the database
storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
# storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
copyTransaction = storage.Transaction(
wallet=wallet,
transaction_type=tr["type"],
symbol_in=tr["token_in"],
amount_in=tr["amount_in"] if is_buy else 0,
value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
symbol_out=tr["token_out"],
amount_out=tr["amount_out"] if not is_buy else 0,
value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
tx_signature=tx_signature
)
try: storage.store_transaction(copyTransaction)
except: logging.error(traceback.format_exc())
# Attempt to execute the copytrade transaction
try:
# await SolanaAPI.SAPI.follow_move(tr)
@ -242,15 +310,34 @@ def init_app(tr_handler=None):
except Exception as e:
logging.error(f"Error processing transaction notification: {str(e)}")
# Log the full traceback for debugging
import traceback
logging.error(traceback.format_exc())
@app.route('/retry', methods=['GET'])
@app.route('/replay_wh', methods=['POST'])
async def replay_wh():
try:
data = request.get_json()
filename = data.get('filename')
if not filename:
return jsonify({"error": "Filename not provided"}), 400
file_path = os.path.join(SolanaAPI.root_path, 'logs', filename)
if not os.path.exists(file_path):
return jsonify({"error": "File not found"}), 404
with open(file_path, 'r') as f:
log_data = json.load(f)
await process_wh(log_data)
return jsonify({"status": "Replay successful"}), 200
except Exception as e:
logging.error(f"Error replaying webhook file: {e}")
return jsonify({"error": "Failed to replay webhook file"}), 500
@app.route('/retry-last-log', methods=['GET'])
async def retry_last_log():
wh = request.args.get('wh', 'false').lower() == 'true'
latest_log_file = get_latest_log_file(wh)
latest_log_file = get_latest_log_file(wh)
if not latest_log_file:
return jsonify({"error": "No log files found"}), 404