diff --git a/crypto/sol/.env b/crypto/sol/.env
index 872d843..1e5dfd2 100644
--- a/crypto/sol/.env
+++ b/crypto/sol/.env
@@ -23,18 +23,19 @@ TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0"
DISPLAY_CURRENCY=USD
#FOLLOW_AMOUNT=3
-FOLLOW_AMOUNT=percentage
+# proportional, xx%,
+FOLLOW_AMOUNT=5%
LIQUIDITY_TOKENS=EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v,So11111111111111111111111111111111111111112
PRIORITY=1 # 0-10, 5 = market cap, 10 twice market cap
DO_WATCH_WALLET=True
# Niki's to Sync: [PROD]
-#FOLLOWED_WALLET="3EZkyU9zQRrHPnrNovDiRCA9Yg3wLK35u9cdrcGcszi1"
-#YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV"
-#PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH
+FOLLOWED_WALLET="7TS3ATxhEVyah29gU7Z6zwwsNpWLm88ZPBig1dwyMFip"
+YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV"
+PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH
# Sync to main [DEV]
-FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV"
-YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5"
-PK=5ccrMf3BFFE1HMsXt17btK1tMSNay7aBoY27saPHrqg2JEjxKBmBbxUABD9Jh7Gisf1bhM51oGzWdyLUgHdrUJPw
\ No newline at end of file
+#FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV"
+#YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5"
+#PK=5ccrMf3BFFE1HMsXt17btK1tMSNay7aBoY27saPHrqg2JEjxKBmBbxUABD9Jh7Gisf1bhM51oGzWdyLUgHdrUJPw
\ No newline at end of file
diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py
index 0be36c7..8b4a95d 100644
--- a/crypto/sol/modules/SolanaAPI.py
+++ b/crypto/sol/modules/SolanaAPI.py
@@ -48,7 +48,7 @@ import random
import websockets
from typing import Dict, List, Optional
import requests
-from datetime import datetime
+from datetime import datetime, timedelta
from solana.rpc.types import TokenAccountOpts, TxOpts
from typing import List, Dict, Any, Tuple
import traceback
@@ -134,6 +134,9 @@ class SolanaWS:
elif 'error' in response_data:
logger.error(f"Error in WebSocket RPC call: {response_data['error']}")
return None
+ # if result is integer
+ elif "id" in response_data and int(response_data['id']) == 1:
+ return int(response_data['result'])
else:
logger.warning(f"Unexpected response: {response_data}")
return None
@@ -789,30 +792,36 @@ class SolanaAPI:
async def follow_move(self,move):
try:
- your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
- your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
- if your_balance_info is not None:
- # Use the balance
- print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
- else:
- print(f"No ballance found for {move['symbol_in']}. Skipping move.")
- await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
- return
-
- your_balance = your_balance_info['amount']
-
+ try:
+ your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+ your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
+ if your_balance_info is not None:
+ # Use the balance
+ print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
+ # else:
+ # print(f"No ballance found for {move['symbol_in']}. Skipping move.")
+ # await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
+ # return
+
+ your_balance = your_balance_info['amount']
+
+ if not your_balance:
+ msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move."
+ logging.warning(msg)
+ await telegram_utils.send_telegram_message(msg)
+ return
+
+ except Exception as e:
+ logging.error(f"Error fetching your balance: {e}")
+ if FOLLOW_AMOUNT == 'proportional':
+ return
token_info = DEX.TOKENS_INFO.get(move['token_in'])
token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata_symbol(move['token_in'])
token_name_out = DEX.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out'])
- if not your_balance:
- msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move."
- logging.warning(msg)
- await telegram_utils.send_telegram_message(msg)
- return
- if FOLLOW_AMOUNT == 'percentage':
+ if FOLLOW_AMOUNT == 'proportional':
# Calculate the amount to swap based on the same percentage as the followed move
if move.get('percentage_swapped') is None:
followed_ballances = await DEX.get_wallet_balances(FOLLOWED_WALLET, doGetTokenName=False)
@@ -827,11 +836,20 @@ class SolanaAPI:
amount_to_swap = 100
else:
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
- elif FOLLOW_AMOUNT == 'exact':
- amount_to_swap = move['amount_in']
+ # if contains %, then calculate the amount to swap based on the same percentage as the followed move
+ elif '%' in FOLLOW_AMOUNT:
+ try:
+ percentage = float(FOLLOW_AMOUNT.strip('%'))
+ amount_to_swap = move['amount_in'] * (percentage / 100)
+ except ValueError:
+ msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
+ logging.warning(msg)
+ await telegram_utils.send_telegram_message(msg)
+ return
+
else:
try:
- fixed_amount = float(FOLLOW_AMOUNT) # un USD
+ fixed_amount = float(FOLLOW_AMOUNT) # in USD
fixed_amount_in_token = fixed_amount / move["token_in_price"]
amount_to_swap = min(fixed_amount_in_token, your_balance)
except ValueError:
@@ -840,7 +858,7 @@ class SolanaAPI:
await telegram_utils.send_telegram_message(msg)
return
- amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
+ # amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
decimals = token_info.get('decimals')
# Convert to lamports
@@ -848,26 +866,26 @@ class SolanaAPI:
amount_lamports = int(amount_to_swap * 10**decimals)
logging.debug(f"Calculated amount in lamports: {amount_lamports}")
- if your_balance < amount_to_swap: # should not happen
- msg = (
- f"Warning:\n"
- f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount_lamports}).\n This will probably fail. But we will try anyway."
- )
- logging.warning(msg)
- await telegram_utils.send_telegram_message(msg)
+ # if your_balance < amount_to_swap: # should not happen
+ # msg = (
+ # f"Warning:\n"
+ # f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount_lamports}).\n This will probably fail. But we will try anyway."
+ # )
+ # logging.warning(msg)
+ # await telegram_utils.send_telegram_message(msg)
try:
- try:
- notification = (
- f"Initiating move:\n"
- f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
- + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
- )
- # logging.info(notification)
- # error_logger.info(notification)
- # await telegram_utils.send_telegram_message(notification)
- except Exception as e:
- logging.error(f"Error sending notification: {e}")
+ # try:
+ # notification = (
+ # f"Initiating move:\n"
+ # f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
+ # + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
+ # )
+ # # logging.info(notification)
+ # # error_logger.info(notification)
+ # # await telegram_utils.send_telegram_message(notification)
+ # except Exception as e:
+ # logging.error(f"Error sending notification: {e}")
if self.pk is None:
self.pk = await get_pk()
@@ -877,6 +895,7 @@ class SolanaAPI:
async_client = AsyncClient(SOLANA_WS_URL)
jupiter = Jupiter(async_client, private_key)
+
# https://station.jup.ag/api-v6/post-swap
#transaction_data = await jupiter.swap(
transaction_data = await self.swap_on_jupiter(
@@ -889,6 +908,7 @@ class SolanaAPI:
logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
+
# working - no priority fee
signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message))
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
@@ -896,8 +916,9 @@ class SolanaAPI:
opts = TxOpts(
skip_preflight=False,
preflight_commitment=Processed,
+ max_retries=5 # Add retries for network issues
)
-
+
# send the transaction
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
@@ -913,7 +934,10 @@ class SolanaAPI:
logging.warning(f"Failed to get transaction details for {transaction_id}.\n Probably transaction failed. Retrying again...")
await asyncio.sleep(3)
except Exception as e:
- decoded_data = ''# base64.b64decode(transaction_data)
+ # decode transacion data (try base58/64)
+ # decoded_data = base58.b58decode(transaction_data).decode('utf-8')
+ # decoded_data = base64.b64decode(transaction_data).decode('utf-8')
+ decoded_data = None
error_message = f"Move Failed:\n{str(e)}\n{decoded_data}\n{move}"
logging.error(error_message)
# log the errors to /logs/errors.log
@@ -1045,7 +1069,8 @@ class SolanaDEX:
token_info = self.TOKENS_INFO.setdefault(token, {})
if 'symbol' not in token_info:
token_info['symbol'] = await SAPI.get_token_metadata_symbol(token)
- token_info['price'] = price
+ token_info['price'] = price
+ token_info['lastUpdated'] = datetime.now().isoformat()
return prices
@@ -1186,6 +1211,10 @@ class SolanaDEX:
asyncio.set_event_loop(loop)
logging.info(f"Getting balances for wallet: {wallet_address}")
response = None
+ # if ballances got in last 2 minutes, return them
+ if "lastUpdated" in self.TOKENS_INFO and datetime.fromisoformat(self.TOKENS_INFO["lastUpdated"]) > datetime.now() - timedelta(minutes=2):
+ logging.info(f"Using cached balances for wallet: {wallet_address}")
+ return self.TOKENS_INFO
try:
response = await self.solana_client.get_token_accounts_by_owner_json_parsed(
Pubkey.from_string(wallet_address),
@@ -1204,46 +1233,46 @@ class SolanaDEX:
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
mint = info['mint']
decimals = int(info['tokenAmount']['decimals'])
- amount = int(info['tokenAmount']['amount'])
- amount = int(amount)
- if amount > 1:
- amount = float(amount / 10**decimals)
- if mint in self.TOKENS_INFO:
- token_name = self.TOKENS_INFO[mint].get('symbol')
- elif doGetTokenName:
- token_name = await SAPI.get_token_metadata_symbol(mint) or 'N/A'
- self.TOKENS_INFO[mint] = {'symbol': token_name}
- await asyncio.sleep(2)
-
- self.TOKENS_INFO[mint]['holdedAmount'] = round(amount, decimals)
- self.TOKENS_INFO[mint]['decimals'] = decimals
- balances[mint] = {
- 'name': token_name or 'N/A',
- 'address': mint,
- 'amount': amount,
- 'decimals': decimals
- }
- try:
- logging.debug(f"Account balance for {token_name} ({mint}): {amount}")
- except Exception as e:
- logging.error(f"Error logging account balance: {str(e)}")
+ amount = float(info['tokenAmount']['uiAmountString'])
+ # amount = float(info['tokenAmount']['amount'])
+ # amount = float(amount / 10**decimals)
+ token_name = None
+ if mint in self.TOKENS_INFO:
+ token_name = self.TOKENS_INFO[mint].get('symbol')
+ elif doGetTokenName:
+ token_name = await SAPI.get_token_metadata_symbol(mint)
+ await asyncio.sleep(2)
+ balances[mint] = {
+ 'name': token_name or 'N/A',
+ 'address': mint,
+ 'amount': amount,
+ 'decimals': decimals
+ }
+ self.TOKENS_INFO[mint] = {'symbol': token_name}
+ self.TOKENS_INFO[mint] = self.TOKENS_INFO[mint].update(balances[mint])
+
+ try:
+ logging.debug(f"Account balance for {token_name or "N/A"} ({mint}): {amount}")
+ except Exception as e:
+ logging.error(f"Error logging account balance: {str(e)}")
else:
logging.warning(f"Unexpected data format for account: {account}")
except Exception as e:
logging.error(f"Error parsing account data: {str(e)}")
-
- sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address))
- if sol_balance.value is not None:
- balances['SOL'] = {
- 'name': 'SOL',
- 'address': 'SOL',
- 'amount': sol_balance.value / 1e9
- }
- else:
- logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
+ self.TOKENS_INFO["lastUpdated"] = datetime.now().isoformat()
+
+ # sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address))
+ # if sol_balance.value is not None:
+ # balances['SOL'] = {
+ # 'name': 'SOL',
+ # 'address': 'SOL',
+ # 'amount': sol_balance.value / 1e9
+ # }
+ # else:
+ # logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
except Exception as e:
- logging.error(f"Error getting wallet balances: {str(e)}")
+ logging.error(f"Error getting wallet balances: {str(e)} {e.error_msg}")
if response and response.value:
logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}")
else:
diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py
index 3449343..20aca8c 100644
--- a/crypto/sol/modules/webui.py
+++ b/crypto/sol/modules/webui.py
@@ -5,15 +5,24 @@ from concurrent.futures import ThreadPoolExecutor
import traceback
from flask import Flask, jsonify, request, render_template, redirect, url_for
+
# from flask_oauthlib.client import OAuth
-from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
+from flask_login import (
+ LoginManager,
+ UserMixin,
+ login_user,
+ login_required,
+ logout_user,
+ current_user,
+)
import secrets
import json
+
# from crypto.sol.config import LIQUIDITY_TOKENS
from config import LIQUIDITY_TOKENS, YOUR_WALLET
from modules import storage, utils, SolanaAPI
-from modules.utils import async_safe_call, decode_instruction_data
+from modules.utils import async_safe_call, decode_instruction_data
from modules.storage import Storage
import os
import logging
@@ -21,16 +30,19 @@ from datetime import datetime
on_transaction = None
+
def init_app(tr_handler=None):
global on_transaction
on_transaction = tr_handler
- app = Flask(__name__, template_folder='../templates', static_folder='../static')
-
+ app = Flask(__name__, template_folder="../templates", static_folder="../static")
+
app.secret_key = secrets.token_hex(16)
- executor = ThreadPoolExecutor(max_workers=10) # Adjust the number of workers as needed
+ executor = ThreadPoolExecutor(
+ max_workers=10
+ ) # Adjust the number of workers as needed
login_manager = LoginManager(app)
- login_manager.login_view = 'login'
-
+ login_manager.login_view = "login"
+
storage = Storage()
# Ensure database connection
@@ -55,17 +67,18 @@ def init_app(tr_handler=None):
# authorize_url='https://accounts.google.com/o/oauth2/auth',
# )
-
login_manager = LoginManager()
login_manager.init_app(app)
-
+
logger = logging.getLogger(__name__)
-
+
# API
- @app.route('/tr//', methods=['GET', 'POST'])
+ @app.route("/tr//", methods=["GET", "POST"])
async def transaction_notified(wallet, tx_signature):
try:
- logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}")
+ logger.info(
+ f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}"
+ )
request_data = request.get_json() if request.is_json else None
if not request_data:
# Process the transaction
@@ -73,73 +86,76 @@ def init_app(tr_handler=None):
tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, [])
else:
tr = request_data
-
-
- # ToDo - probably optimize
- tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in'])
- tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out'])
- prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']])
- tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in']
- tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out']
-
- notification = (
- f"Got TXN notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n"
+
+ # ToDo - probably optimize
+ tr["symbol_in"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
+ tr["token_in"]
)
+ tr["symbol_out"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
+ tr["token_out"]
+ )
+ prices = await SolanaAPI.DEX.get_token_prices(
+ [tr["token_in"], tr["token_out"]]
+ )
+ tr["value_in_USD"] = prices.get(tr["token_in"], 0) * tr["amount_in"]
+ tr["value_out_USD"] = prices.get(tr["token_out"], 0) * tr["amount_out"]
+
+ notification = f"Got TXN notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n"
logging.info(notification)
await utils.telegram_utils.send_telegram_message(notification)
-
+
# Store the notified transaction in the database
- original_transaction = storage.Transaction(
- wallet=wallet,
- transaction_type="SWAP",
- symbol_in=tr['symbol_in'],
- amount_in=tr['amount_in'],
- value_in_usd=tr['value_in_USD'],
- symbol_out=tr['symbol_out'],
- amount_out=tr['amount_out'],
- value_out_usd=tr['value_out_USD'],
- tx_signature=tx_signature
- )
- await storage.store_transaction(original_transaction)
- # Attempt to execute the copytrade transaction
+ # original_transaction = storage.Transaction(
+ # wallet=wallet,
+ # transaction_type="SWAP",
+ # symbol_in=tr['symbol_in'],
+ # amount_in=tr['amount_in'],
+ # value_in_usd=tr['value_in_USD'],
+ # symbol_out=tr['symbol_out'],
+ # amount_out=tr['amount_out'],
+ # value_out_usd=tr['value_out_USD'],
+ # tx_signature=tx_signature
+ # )
+ # await storage.store_transaction(original_transaction)
+ # # Attempt to execute the copytrade transaction
try:
await SolanaAPI.SAPI.follow_move(tr)
# Store the successful copytrade transaction
- follow_transaction = storage.Transaction(
- wallet=wallet,
- transaction_type="SWAP",
- symbol_in=tr['symbol_in'],
- amount_in=tr['amount_in'],
- value_in_usd=tr['value_in_USD'],
- symbol_out=tr['symbol_out'],
- amount_out=tr['amount_out'],
- value_out_usd=tr['value_out_USD'],
- tx_signature=tx_signature
- )
- await storage.store_transaction(follow_transaction)
+ # follow_transaction = storage.Transaction(
+ # wallet=wallet,
+ # transaction_type="SWAP",
+ # symbol_in=tr['symbol_in'],
+ # amount_in=tr['amount_in'],
+ # value_in_usd=tr['value_in_USD'],
+ # symbol_out=tr['symbol_out'],
+ # amount_out=tr['amount_out'],
+ # value_out_usd=tr['value_out_USD'],
+ # tx_signature=tx_signature
+ # )
+ # await storage.store_transaction(follow_transaction)
except Exception as e:
- # Store the failed copytrade transaction
- failed_transaction = storage.Transaction(
- wallet=wallet,
- transaction_type="SWAP_FAIL",
- symbol_in=tr['symbol_in'],
- amount_in=tr['amount_in'],
- value_in_usd=tr['value_in_USD'],
- symbol_out=tr['symbol_out'],
- amount_out=tr['amount_out'],
- value_out_usd=tr['value_out_USD'],
- tx_signature=tx_signature
- )
- await storage.store_transaction(failed_transaction)
+ # # Store the failed copytrade transaction
+ # failed_transaction = storage.Transaction(
+ # wallet=wallet,
+ # transaction_type="SWAP_FAIL",
+ # symbol_in=tr['symbol_in'],
+ # amount_in=tr['amount_in'],
+ # value_in_usd=tr['value_in_USD'],
+ # symbol_out=tr['symbol_out'],
+ # amount_out=tr['amount_out'],
+ # value_out_usd=tr['value_out_USD'],
+ # tx_signature=tx_signature
+ # )
+ # await storage.store_transaction(failed_transaction)
logging.error(f"Copytrade transaction failed: {e}")
- # ToDo - probably optimize
+ # ToDo - probably optimize
await SolanaAPI.SAPI.save_token_info()
return jsonify(tr), 200
except Exception as e:
logging.error(f"Error processing transaction: {e}")
return jsonify({"error": "Failed to process transaction"}), 500
- @app.route('/wh', methods=['POST'])
+ @app.route("/wh", methods=["POST"])
async def webhook():
try:
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
@@ -152,29 +168,30 @@ def init_app(tr_handler=None):
else:
logger.info(f"Webhook data: {request_data}")
# save dump to /cache/last-webhook-{datetime}.json
-
- with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f:
+
+ with open(
+ os.path.join(SolanaAPI.root_path, "logs", f"wh_{current_time}.json"),
+ "w",
+ ) as f:
json.dump(request_data, f)
-
-
+
if "meta" in request_data[0]:
meta = request_data[0]["meta"]
-
+
# Parse inner instructions
for inner_ix in meta.get("innerInstructions", []):
for instruction in inner_ix.get("instructions", []):
decoded = decode_instruction_data(instruction["data"])
logger.info(f"Instruction data decoded: {decoded}")
-
+
# Example of pattern matching for specific instruction types
- if decoded["instruction_type"] == 1: # Example: swap instruction
+ if (
+ decoded["instruction_type"] == 1
+ ): # Example: swap instruction
# Parse parameters based on program type
# Different DEXes will have different parameter layouts
pass
-
-
-
-
+
# await process_wh(request_data)
# don't wait for the process to finish
executor.submit(asyncio.run, process_wh(request_data))
@@ -182,33 +199,37 @@ def init_app(tr_handler=None):
except Exception as e:
logging.error(f"Error processing webhook: {e}")
return jsonify({"error": "Failed to process webhook"}), 500
-
- # Flask route to retry processing the last log
-
- async def process_wh(data):
+
+ # Flask route to retry processing the last log
+
+ async def process_wh(data):
global on_transaction
-
+
try:
- if data[0].get('type') == "SWAP":
- swap_event = data[0]['events'].get('swap')
+ if data[0].get("type") == "SWAP":
+ swap_event = data[0]["events"].get("swap")
if not swap_event:
logging.warning("No swap event found in data")
return
-
+
# Extract token input details from the first token input
- token_inputs = swap_event.get('tokenInputs', [])
- token_outputs = swap_event.get('tokenOutputs', [])
-
+ token_inputs = swap_event.get("tokenInputs", [])
+ token_outputs = swap_event.get("tokenOutputs", [])
+
tr = {}
- wallet = data[0]['feePayer'] # Using feePayer as the wallet address
- tx_signature = data[0]['signature']
+ wallet = data[0]["feePayer"] # Using feePayer as the wallet address
+ tx_signature = data[0]["signature"]
usdcMint = LIQUIDITY_TOKENS[0]
solMint = LIQUIDITY_TOKENS[1]
-
-
+
try:
# Determine transaction type
- if token_inputs and token_outputs and LIQUIDITY_TOKENS[0] in [token_inputs[0]["mint"], token_outputs[0]["mint"]]:
+ if (
+ token_inputs
+ and token_outputs
+ and LIQUIDITY_TOKENS[0]
+ in [token_inputs[0]["mint"], token_outputs[0]["mint"]]
+ ):
if token_inputs[0]["mint"] == usdcMint:
tr["type"] = "BUY"
else:
@@ -216,107 +237,137 @@ def init_app(tr_handler=None):
else:
tr["type"] = "SWAP"
-
- if swap_event.get('nativeInput', None):
+ if swap_event.get("nativeInput", None):
tr["token_in"] = solMint
- tr["amount_in"] = int(swap_event.get('nativeInput')["amount"])/ 10**6
+ tr["amount_in"] = (
+ int(swap_event.get("nativeInput")["amount"]) / 10**9
+ )
tr["type"] = "BUY"
- tr["token_in_decimals"] = 6
-
- if swap_event.get('nativeOutput', None):
+ tr["token_in_decimals"] = 9
+
+ if swap_event.get("nativeOutput", None):
tr["token_out"] = solMint
- tr["amount_out"] = int(swap_event.get('nativeOutput')["amount"]) / 10**6
+ tr["amount_out"] = (
+ int(swap_event.get("nativeOutput")["amount"]) / 10**9
+ )
tr["type"] = "SELL"
- tr["token_out_decimals"] = 6
+ tr["token_out_decimals"] = 9
- if not token_inputs or len(token_inputs) == 0:
- logging.info("Assumed USDC as first token. BUY transaction detected")
- tr["token_in"] = usdcMint
- tr["type"] = "BUY"
- tr["amount_in"] = await calculate_price_amount(token_outputs[0])
- else:
- token_in = token_inputs[0]
- tr["token_in"] = token_in["mint"]
- tr["token_in_decimals"] = get_decimals(token_in)
- tr["amount_in"] = calculate_amount(token_in)
+ # if we don't have token_in yet
+ if "token_in" not in tr:
+ if not token_inputs or len(token_inputs) == 0:
+ logging.info(
+ "Assumed USDC as first token. BUY transaction detected"
+ )
+ tr["token_in"] = usdcMint
+ tr["type"] = "BUY"
+ tr["amount_in"] = await calculate_price_amount(
+ token_outputs[0]
+ )
+ else:
+ token_in = token_inputs[0]
+ tr["token_in"] = token_in["mint"]
+ tr["token_in_decimals"] = get_decimals(token_in)
+ tr["amount_in"] = calculate_amount(token_in)
+
+ # if we don't have token_out yet
+ if "token_out" not in tr:
+ if not token_outputs or len(token_outputs) == 0:
+ logging.info(
+ "Assumed USDC as second token. SELL transaction detected"
+ )
+ tr["token_out"] = usdcMint
+ tr["type"] = "SELL"
+ tr["amount_out"] = await calculate_price_amount(
+ token_inputs[0]
+ )
+ else:
+ token_out = token_outputs[0]
+ tr["token_out"] = token_out["mint"]
+ tr["token_out_decimals"] = get_decimals(token_out)
+ tr["amount_out"] = calculate_amount(token_out)
- if not token_outputs or len(token_outputs) == 0:
- logging.info("Assumed USDC as second token. SELL transaction detected")
- tr["token_out"] = usdcMint
- tr["type"] = "SELL"
- tr["amount_out"] = await calculate_price_amount(token_inputs[0])
- else:
- token_out = token_outputs[0]
- tr["token_out"] = token_out["mint"]
- tr["token_out_decimals"] = get_decimals(token_out)
- tr["amount_out"] = calculate_amount(token_out)
-
# Store transaction in database
if tr["type"] in ["BUY", "SELL"]:
is_buy = tr["type"] == "BUY"
-
- transaction = storage.Transaction(
- wallet=wallet,
- transaction_type=tr["type"],
- symbol_in=tr["token_in"],
- amount_in=tr["amount_in"] if is_buy else 0,
- value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
- symbol_out=tr["token_out"],
- amount_out=tr["amount_out"] if not is_buy else 0,
- value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
- tx_signature=tx_signature
+
+ # transaction = storage.Transaction(
+ # wallet=wallet,
+ # transaction_type=tr["type"],
+ # symbol_in=tr["token_in"],
+ # amount_in=tr["amount_in"] if is_buy else 0,
+ # value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
+ # symbol_out=tr["token_out"],
+ # amount_out=tr["amount_out"] if not is_buy else 0,
+ # value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
+ # tx_signature=tx_signature
+ # )
+ # await storage.store_transaction(transaction)
+
+ if swap_event.get("nativeInput"): # SOL
+ token_in = swap_event.get("nativeInput", [])
+ logger.info(
+ f"Native input (SOL) detected ({token_in["amount"]})"
+ )
+
+ if (
+ not tr["token_in"]
+ or not tr["token_out"]
+ or tr["amount_in"] == 0
+ or tr["amount_out"] == 0
+ ):
+ logging.warning(
+ "Incomplete swap details found in logs. Getting details from transaction"
+ )
+ tx_signature = data[0].get("signature")
+ logs = data[0].get("logs", [])
+ tr = await SolanaAPI.SAPI.get_transaction_details_info(
+ tx_signature, logs
)
- await storage.store_transaction(transaction)
-
- if swap_event.get('nativeInput'): # SOL
- token_in = swap_event.get('nativeInput', [])
- logger.info(f"Native input (SOL) detected ({token_in["amount"]})")
-
- if not tr["token_in"] or not tr["token_out"] or tr["amount_in"] == 0 or tr["amount_out"] == 0:
- logging.warning("Incomplete swap details found in logs. Getting details from transaction")
- tx_signature = data[0].get('signature')
- logs = data[0].get('logs', [])
- tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, logs)
except Exception as e:
logging.error(f"Error loading transaction token data: {str(e)}")
-
- # ToDo - probably optimize
- tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in'])
- tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out'])
- prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']])
- tr["token_in_price"] = prices.get(tr['token_in'], 0)
- tr["token_out_price"] = prices.get(tr['token_out'], 0)
- tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in']
- tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out']
-
- notification = (
- f"Got WH notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['amount_out']} {tr['symbol_out']} ${tr['value_out_USD']}\n"
+ # ToDo - probably optimize
+ tr["symbol_in"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
+ tr["token_in"]
)
- logging.info(notification)
- await utils.telegram_utils.send_telegram_message(notification)
-
+ tr["symbol_out"] = await SolanaAPI.SAPI.get_token_metadata_symbol(
+ tr["token_out"]
+ )
+ # ToDo - optimize
+ # prices = await SolanaAPI.DEX.get_token_prices(
+ # [tr["token_in"], tr["token_out"]]
+ # )
+ # tr["token_in_price"] = prices.get(tr["token_in"], 0)
+ # tr["token_out_price"] = prices.get(tr["token_out"], 0)
+ # tr["value_in_USD"] = prices.get(tr["token_in"], 0) * tr["amount_in"]
+ # tr["value_out_USD"] = prices.get(tr["token_out"], 0) * tr["amount_out"]
+
+ # notification = f"Got WH notification:: {tr['amount_in']} {tr['symbol_in'] or tr["token_in"]} swapped for {tr['amount_out']} {tr['symbol_out']} ${tr['value_out_USD']}\n"
+ # logging.info(notification)
+ # await utils.telegram_utils.send_telegram_message(notification)
+
# Store the notified transaction in the database
# storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
- copyTransaction = storage.Transaction(
- wallet=wallet,
- transaction_type=tr["type"],
- symbol_in=tr["token_in"],
- amount_in=tr["amount_in"] if is_buy else 0,
- value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
- symbol_out=tr["token_out"],
- amount_out=tr["amount_out"] if not is_buy else 0,
- value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
- tx_signature=tx_signature
- )
- try: await storage.store_transaction(copyTransaction)
- except: logging.error(traceback.format_exc())
-
+ # copyTransaction = storage.Transaction(
+ # wallet=wallet,
+ # transaction_type=tr["type"],
+ # symbol_in=tr["token_in"],
+ # amount_in=tr["amount_in"] if is_buy else 0,
+ # value_in_usd=tr.get("swap_amount_usd", 0) if is_buy else 0,
+ # symbol_out=tr["token_out"],
+ # amount_out=tr["amount_out"] if not is_buy else 0,
+ # value_out_usd=tr.get("swap_amount_usd", 0) if not is_buy else 0,
+ # tx_signature=tx_signature
+ # )
+ # try: await storage.store_transaction(copyTransaction)
+ # except: logging.error(traceback.format_exc())
+
# Attempt to execute the copytrade transaction
try:
# await SolanaAPI.SAPI.follow_move(tr)
- if on_transaction:
- await async_safe_call( on_transaction, tr)
+ if on_transaction:
+ await async_safe_call(on_transaction, tr)
else:
await SolanaAPI.SAPI.follow_move(tr)
# Store the successful copytrade transaction
@@ -325,7 +376,7 @@ def init_app(tr_handler=None):
# Store the failed copytrade transaction
# await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
logging.error(f"Copytrade transaction failed: {e}")
- # ToDo - probably optimize
+ # ToDo - probably optimize
await SolanaAPI.DEX.save_token_info()
else:
logger.info("wh transaction is not a swap. skipping...")
@@ -335,11 +386,15 @@ def init_app(tr_handler=None):
logging.error(traceback.format_exc())
def get_decimals(token_data):
- return token_data["rawTokenAmount"].get("decimals") or token_data["rawTokenAmount"].get("decimalFs", 0)
+ return token_data["rawTokenAmount"].get("decimals") or token_data[
+ "rawTokenAmount"
+ ].get("decimalFs", 0)
+
def calculate_amount(token_data):
decimals = get_decimals(token_data)
token_amount = int(token_data["rawTokenAmount"]["tokenAmount"])
return float(token_amount / 10**decimals)
+
async def calculate_price_amount(token_data, prices=None):
if not prices:
prices = await SolanaAPI.DEX.get_token_prices([token_data["mint"]])
@@ -347,19 +402,19 @@ def init_app(tr_handler=None):
token_amount = int(token_data["rawTokenAmount"]["tokenAmount"])
return prices[token_data["mint"]] * token_amount / 10**decimals
- @app.route('/replay_wh', methods=['POST'])
+ @app.route("/replay_wh", methods=["POST"])
async def replay_wh():
try:
data = request.get_json()
- filename = data.get('filename')
+ filename = data.get("filename")
if not filename:
return jsonify({"error": "Filename not provided"}), 400
- file_path = os.path.join(SolanaAPI.root_path, 'logs', filename)
+ file_path = os.path.join(SolanaAPI.root_path, "logs", filename)
if not os.path.exists(file_path):
return jsonify({"error": "File not found"}), 404
- with open(file_path, 'r') as f:
+ with open(file_path, "r") as f:
log_data = json.load(f)
await process_wh(log_data)
@@ -368,42 +423,45 @@ def init_app(tr_handler=None):
except Exception as e:
logging.error(f"Error replaying webhook file: {e}")
return jsonify({"error": "Failed to replay webhook file"}), 500
- @app.route('/retry-last-log', methods=['GET'])
+
+ @app.route("/retry-last-log", methods=["GET"])
async def retry_last_log():
- wh = request.args.get('wh', 'false').lower() == 'true'
-
- latest_log_file = get_latest_log_file(wh)
+ wh = request.args.get("wh", "false").lower() == "true"
+
+ latest_log_file = get_latest_log_file(wh)
if not latest_log_file:
return jsonify({"error": "No log files found"}), 404
try:
utils.log.info(f"Processing latest log file: {latest_log_file}")
- with open(latest_log_file, 'r') as f:
+ with open(latest_log_file, "r") as f:
log = json.load(f)
if wh:
result = await process_wh(log)
else:
result = await SolanaAPI.process_log(log)
-
- return jsonify({
- "file": latest_log_file,
- "status": "Log dump processed successfully",
- "result": result
- }), 200
+
+ return (
+ jsonify(
+ {
+ "file": latest_log_file,
+ "status": "Log dump processed successfully",
+ "result": result,
+ }
+ ),
+ 200,
+ )
except Exception as e:
utils.log.error(f"Error processing log dump: {e}")
return jsonify({"error": "Failed to process log"}), 500
-
-
-
- # # # #
+ # # # #
# AUTHENTICATION
# # # #
-
- @app.route('/login/google/authorized')
+
+ @app.route("/login/google/authorized")
def authorized():
# resp = google.authorized_response()
# if resp is None or resp.get('access_token') is None:
@@ -415,9 +473,8 @@ def init_app(tr_handler=None):
# user_info = google.get('userinfo')
# user = storage.get_or_create_user(user_info.data['email'], user_info.data['id'])
# login_user(user)
- return redirect(url_for('index'))
-
-
+ return redirect(url_for("index"))
+
class User(UserMixin):
def __init__(self, id, username, email):
self.id = id
@@ -428,88 +485,107 @@ def init_app(tr_handler=None):
def load_user(user_id):
user_data = storage.get_user_by_id(user_id)
if user_data:
- return User(id=user_data['id'], username=user_data['username'], email=user_data['email'])
+ return User(
+ id=user_data["id"],
+ username=user_data["username"],
+ email=user_data["email"],
+ )
return None
- @app.route('/')
+ @app.route("/")
def index():
- return render_template('index.html')
+ return render_template("index.html")
@login_manager.unauthorized_handler
def unauthorized():
- return redirect('/login?next=' + request.path)
+ return redirect("/login?next=" + request.path)
# return jsonify({'error': 'Unauthorized'}), 401
- @app.route('/login', methods=['GET', 'POST'])
+ @app.route("/login", methods=["GET", "POST"])
def login():
- if request.method == 'POST':
- username = request.form.get('username')
- password = request.form.get('password')
+ if request.method == "POST":
+ username = request.form.get("username")
+ password = request.form.get("password")
user = storage.authenticate_user(username, password)
if user:
- login_user(User(id=user['id'], username=user['username'], email=user['email']))
- return redirect(url_for('dashboard'))
+ login_user(
+ User(id=user["id"], username=user["username"], email=user["email"])
+ )
+ return redirect(url_for("dashboard"))
else:
- return render_template('login.html', error='Invalid credentials')
- elif request.args.get('google'):
+ return render_template("login.html", error="Invalid credentials")
+ elif request.args.get("google"):
# Uncomment the following line if Google OAuth is set up
# return google.authorize(callback=url_for('authorized', _external=True))
- return render_template('login.html', error='Google OAuth not configured')
- return render_template('login.html')
+ return render_template("login.html", error="Google OAuth not configured")
+ return render_template("login.html")
- @app.route('/logout')
+ @app.route("/logout")
@login_required
def logout():
logout_user()
- return redirect(url_for('index'))
+ return redirect(url_for("index"))
- @app.route('/dashboard')
+ @app.route("/dashboard")
@login_required
def dashboard():
- return render_template('dashboard.html')
+ return render_template("dashboard.html")
- @app.route('/generate_api_key', methods=['POST'])
+ @app.route("/generate_api_key", methods=["POST"])
@login_required
def generate_api_key():
api_key = secrets.token_urlsafe(32)
storage.store_api_key(current_user.id, api_key)
- return jsonify({'api_key': api_key})
+ return jsonify({"api_key": api_key})
- @app.route('/wallet//transactions', methods=['GET'])
+ @app.route("/wallet//transactions", methods=["GET"])
@login_required
@login_required
def get_transactions(wallet_id):
transactions = storage.get_transactions(wallet_id)
return jsonify(transactions)
- @app.route('/wallet//holdings', methods=['GET'])
+ @app.route("/wallet//holdings", methods=["GET"])
@login_required
@login_required
def get_holdings(wallet_id):
holdings = storage.get_holdings(wallet_id)
return jsonify(holdings)
-
- return app
+
+ return app
+
def teardown_app():
# Close the database connection
storage.disconnect()
+
# Function to find the latest log file
-def get_latest_log_file(wh:bool):
- log_dir = os.path.join(SolanaAPI.root_path, 'logs')
+def get_latest_log_file(wh: bool):
+ log_dir = os.path.join(SolanaAPI.root_path, "logs")
try:
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
# filter files mask log_20241005_004103_143116.json
if wh:
- files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('wh_')]
+ files = [
+ f
+ for f in os.listdir(log_dir)
+ if os.path.isfile(os.path.join(log_dir, f)) and f.startswith("wh_")
+ ]
else:
- files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
+ files = [
+ f
+ for f in os.listdir(log_dir)
+ if os.path.isfile(os.path.join(log_dir, f)) and f.startswith("log_")
+ ]
- latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x)))
+ latest_file = max(
+ files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))
+ )
return os.path.join(log_dir, latest_file)
except Exception as e:
utils.log.error(f"Error fetching latest log file: {e}")
return None
+
export = init_app, teardown_app