gogo2/crypto/sol/app.py

934 lines
40 KiB
Python

import asyncio
import websockets
import json
from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.transaction import Signature
from solana.rpc.websocket_api import connect
from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.commitment import Confirmed, Processed
from solana.transaction import Transaction
from base64 import b64decode
import base58
from solders.rpc.requests import GetTransaction
from solders.signature import Signature
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.transaction import VersionedTransaction
from solders.transaction import Transaction
from solders.message import Message
from solders.instruction import Instruction
from solders.hash import Hash
from solders.instruction import CompiledInstruction
from solders import message
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
from dexscreener import DexscreenerClient
from telegram import Bot
from telegram.constants import ParseMode
import datetime
import logging
import base64
import os
from dotenv import load_dotenv,set_key
import aiohttp
from typing import List, Dict
import requests
import threading
import re
from typing import List, Dict, Any, Tuple
load_dotenv()
app = Flask(__name__)
ENV_FILE = '.env'
async def save_subscription_id(subscription_id):
# storing subscription id in .env file disabled
#set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id))
logger.info(f"Saved subscription ID: {subscription_id}")
async def load_subscription_id():
subscription_id = os.getenv("SUBSCRIPTION_ID")
return int(subscription_id) if subscription_id else None
# Function to find the latest log file
def get_latest_log_file():
log_dir = './logs'
try:
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
# filter files mask log_20241005_004103_143116.json
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x)))
return os.path.join(log_dir, latest_file)
except Exception as e:
logging.error(f"Error fetching latest log file: {e}")
return None
# Flask route to retry processing the last log
@app.route('/retry-last-log', methods=['GET'])
def retry_last_log():
latest_log_file = get_latest_log_file()
if not latest_log_file:
return jsonify({"error": "No log files found"}), 404
try:
with open(latest_log_file, 'r') as f:
log = json.load(f)
# await process_log(log)
# Run the asynchronous process_log function
asyncio.run(process_log(log))
return jsonify({"status": "Log dump processed successfully"}), 200
except Exception as e:
logging.error(f"Error processing log dump: {e}")
return jsonify({"error": "Failed to process log"}), 500
# Configuration
DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
YOUR_WALLET = os.getenv("YOUR_WALLET")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
SOLANA_WS_URL = os.getenv("SOLANA_WS_URL")
SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL")
DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
# Use the production Solana RPC endpoint
solana_client = AsyncClient(SOLANA_HTTP_URL)
dexscreener_client = DexscreenerClient()
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# Token addresses (initialize with some known tokens)
TOKEN_ADDRESSES = {
"SOL": "So11111111111111111111111111111111111111112",
"USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og",
}
async def send_telegram_message(message):
try:
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
logging.info(f"Telegram message sent: {message}")
# logging.info(f"Telegram message dummy sent: {message}")
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
coingecko_prices = await get_prices_from_coingecko(token_addresses)
# For tokens not found in CoinGecko, use DexScreener
missing_tokens = set(token_addresses) - set(coingecko_prices.keys())
if missing_tokens:
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
coingecko_prices.update(dexscreener_prices)
# If any tokens are still missing, set their prices to 0
for token in set(token_addresses) - set(coingecko_prices.keys()):
coingecko_prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.")
return coingecko_prices
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
params = {
"contract_addresses": ",".join(token_addresses),
"vs_currencies": DISPLAY_CURRENCY.lower()
}
prices = {}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.items():
if DISPLAY_CURRENCY.lower() in price_info:
prices[address] = price_info[DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}")
return prices
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {}
async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
return prices
async def fetch_token_data(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching data from {url}: {str(e)}")
return None
async def get_sol_price() -> float:
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['solana'][DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
return await get_sol_price_from_dexscreener()
async def get_sol_price_from_dexscreener() -> float:
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
prices = await get_prices_from_dexscreener([sol_address])
return prices.get(sol_address, 0.0)
async def get_token_balance_rpc(wallet_address, token_address):
url = SOLANA_HTTP_URL
headers = {"Content-Type": "application/json"}
data = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTokenAccountsByOwner",
"params": [
wallet_address,
{
"mint": token_address
},
{
"encoding": "jsonParsed"
}
]
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status() # Raises an error for bad responses
accounts = response.json()
if 'result' in accounts and accounts['result']['value']:
first_account = accounts['result']['value'][0]['pubkey']
balance_data = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTokenAccountBalance",
"params": [
first_account
]
}
balance_response = requests.post(url, headers=headers, data=json.dumps(balance_data))
balance_response.raise_for_status()
balance = balance_response.json()
if 'result' in balance and 'value' in balance['result']:
amount = float(balance['result']['value']['uiAmount'])
logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}")
return amount
else:
logging.debug(f"No balance found for {token_address} in {wallet_address}")
return 0
else:
logging.debug(f"No account found for {token_address} in {wallet_address}")
return 0
except requests.exceptions.RequestException as e:
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}")
return 0
async def get_token_name(mint_address):
try:
token_info = await solana_client.get_account_info_json_parsed(Pubkey.from_string(mint_address))
if token_info.value and 'symbol' in token_info.value:
return token_info.value['symbol']
except Exception as e:
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
return None
async def get_wallet_balances(wallet_address):
balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}")
try:
response = await solana_client.get_token_accounts_by_owner_json_parsed(
Pubkey.from_string(wallet_address),
opts=TokenAccountOpts(
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
),
commitment=Confirmed
)
if response.value:
for account in response.value:
parsed_data = account.account.data.parsed
if isinstance(parsed_data, dict) and 'info' in parsed_data:
info = parsed_data['info']
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
mint = info['mint']
#amount = float(info['tokenAmount']['amount']) / (10 ** info['tokenAmount']['decimals'])
amount = float(info['tokenAmount']['amount'])/10**info['tokenAmount']['decimals']
if amount > 0:
token_name = await get_token_name(mint) or 'Unknown'
balances[mint] = {
'name': token_name,
'address': mint,
'amount': amount
}
logging.debug(f"Balance for {token_name} ({mint}): {amount}")
else:
logging.warning(f"Unexpected data format for account: {account}")
sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
if sol_balance.value is not None:
balances['SOL'] = {
'name': 'SOL',
'address': 'SOL',
'amount': sol_balance.value / 1e9
}
else:
logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
except Exception as e:
logging.error(f"Error getting wallet balances: {str(e)}")
return balances
async def convert_balances_to_currency(balances , sol_price):
converted_balances = {}
for address, info in balances.items():
converted_balance = info.copy() # Create a copy of the original info
if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price
elif address in TOKEN_PRICES:
converted_balance['value'] = info['amount'] * TOKEN_PRICES[address]
else:
converted_balance['value'] = None # Price not available
logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance
return converted_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()))
TOKEN_PRICES = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += info['value']
message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join([info['name'] for info in TOKEN_ADDRESSES.values()])}"
)
logging.info(message)
await send_telegram_message(message)
async def get_swap_transaction_details(tx_signature_str):
t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0)
try:
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract log messages for order_id and swap details. we have that in the log hook
# log_messages = t.value.meta.log_messages
# for log in log_messages:
# if "order_id" in log:
# parsed_result["order_id"] = log.split(":")[1].strip()
# break
instructions = t.value.transaction.transaction.message.instructions
# Parse the swap instruction to extract token addresses, amounts, and types
for instruction in instructions:
if isinstance(instruction, CompiledInstruction):
if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"):
parsed_info = instruction.parsed.info
mint = parsed_info["mint"]
amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"])
# Determine token in and token out based on balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = mint
parsed_result["amount_out"] = amount
# Calculate USD values if token is USDC
if parsed_result["token_in"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":
parsed_result["amount_in_USD"] = parsed_result["amount_in"]
if parsed_result["token_out"] == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v":
parsed_result["amount_out_USD"] = parsed_result["amount_out"]
# Calculate percentage swapped
if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100
return parsed_result
except Exception as e:
logging.error(f"Error fetching transaction details: {e}")
return None
async def get_transaction_details_with_retry(transaction_id, retry_delay = 9, max_retries = 7):
# wait for the transaction to be confirmed
# await async_client.wait_for_confirmation(Signature.from_string(transaction_id))
# qwery every 5 seconds for the transaction details untill not None or 30 seconds
for _ in range(max_retries):
tx_details = await get_transaction_details_rpc(transaction_id)
if tx_details is not None:
break
logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}")
await asyncio.sleep(retry_delay)
return tx_details
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
try:
if readfromDump and os.path.exists('./logs/transation_details.json'):
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
transaction_details = json.load(f)
return transaction_details
else:
transaction_details = await solana_jsonrpc("getTransaction", [tx_signature])
with open('./logs/transation_details.json', 'w') as f:
json.dump(transaction_details, f, indent=2)
if transaction_details is None:
logging.error(f"Error fetching transaction details for {tx_signature}")
return None
# Initialize default result structure
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract order_id from logs
log_messages = transaction_details.get("meta", {}).get("logMessages", [])
for log in log_messages:
if "order_id" in log:
parsed_result["order_id"] = log.split(":")[2].strip()
break
# Extract token transfers from innerInstructions
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
info = instruction['parsed']['info']
mint = info['mint']
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
# Determine which token is being swapped in and out based on zero balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
# if we've failed to extract token_in and token_out from the transaction details, try a second method
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
transfers = []
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
info = instruction['parsed']['info']
amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
adjusted_amount = amount / (10 ** decimals)
transfers.append({
'mint': info.get('mint'),
'amount': adjusted_amount,
'source': info['source'],
'destination': info['destination']
})
# Identify token_in and token_out
if len(transfers) >= 2:
parsed_result["token_in"] = transfers[0]['mint']
parsed_result["amount_in"] = transfers[0]['amount']
parsed_result["token_out"] = transfers[-1]['mint']
parsed_result["amount_out"] = transfers[-1]['amount']
# If mint is not provided, query the Solana network for the account data
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
#for transfer in transfers:
# do only first and last transfer
for transfer in [transfers[0], transfers[-1]]:
if transfer['mint'] is None:
# Query the Solana network for the account data
account_data_result = await solana_jsonrpc("getAccountInfo", [transfer['source']])
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_value = account_data_result['value']
account_data_data = account_data_value['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'mint' in account_data_info:
transfer['mint'] = account_data_info['mint']
if parsed_result["token_in"] is None:
parsed_result["token_in"] = transfer['mint']
parsed_result["amount_in"] = transfer['amount']/10**6
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = transfer['mint']
parsed_result["amount_out"] = transfer['amount']/10**6
pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balalnces:
if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
break
# Calculate percentage swapped
try:
if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
except Exception as e:
logging.error(f"Error calculating percentage swapped: {e}")
return parsed_result
except requests.exceptions.RequestException as e:
print("Error fetching transaction details:", e)
async def solana_jsonrpc(method, params = None, jsonParsed = True):
# target json example:
# data = {
# "jsonrpc": "2.0",
# "id": 1,
# "method": "getTransaction",
# "params": [
# tx_signature,
# {
# "encoding": "jsonParsed",
# "maxSupportedTransactionVersion": 0
# }
# ]
# }
data = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or []
}
data["params"].append({"maxSupportedTransactionVersion": 0})
if jsonParsed:
data["params"][1]["encoding"] = "jsonParsed"
try:
# url = 'https://solana.drpc.org'
response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
response.raise_for_status() # Raises an error for bad responses
result = response.json()
if not 'result' in result or 'error' in result:
print("Error fetching data from Solana RPC:", result)
return None
return result['result']
except Exception as e:
logging.error(f"Error fetching data from Solana RPC: {e}")
return None
async def save_log(log):
try:
os.makedirs('./logs', exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/log_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(log, f, indent=2)
except Exception as e:
logging.error(f"Error saving RPC log: {e}")
async def process_log(log_result):
if log_result['value']['err']:
return
logs = log_result['value']['logs']
try:
# Detect swap operations in logs
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2']
if any(op in logs for op in swap_operations):
# Save the log to a file
await save_log(log_result)
tx_signature_str = log_result['value']['signature']
before_source_balance = 0
source_token_change = 0
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
i = 0
while i < len(logs):
log_entry = logs[i]
# Check if we found the 'order_id'
if tr_details["order_id"] is None and "order_id" in log_entry:
# Extract the order_id
tr_details["order_id"] = log_entry.split(":")[-1].strip()
tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
# Look for the token change amounts after tokens have been found
if "source_token_change" in log_entry:
parts = log_entry.split(", ")
for part in parts:
if "source_token_change" in part:
tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals
elif "destination_token_change" in part:
tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals
i += 1
# calculatte percentage swapped by digging before_source_balance, source_token_change and after_source_balance
# "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752",
# "Program log: after_source_balance: 0, after_destination_balance: 770570049",
# "Program log: source_token_change: 19471871, destination_token_change: 770570049",
if "before_source_balance" in log_entry:
parts = log_entry.split(", ")
for part in parts:
if "before_source_balance" in part:
before_source_balance = float(part.split(":")[-1].strip()) / 10 ** 6
if "source_token_change" in log_entry:
parts = log_entry.split(", ")
for part in parts:
if "source_token_change" in part:
source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6
# GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS
if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0:
logging.warning("Incomplete swap details found in logs. Getting details from transaction")
tr_details = await get_transaction_details_info(tx_signature_str, logs)
# onlt needed if no details got
if before_source_balance > 0 and source_token_change > 0:
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
message_text = (
f"<b>Swap detected:</b>\n"
f"Token In: {tr_details['token_in']}\n"
f"Token Out: {tr_details['token_out']}\n"
f"Amount In USD: {tr_details['amount_in_USD']}\n"
f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%"
)
await send_telegram_message(message_text)
await follow_move(tr_details)
except Exception as e:
logging.error(f"Error processing log: {e}")
# "Program log: Instruction: Swap2",
# "Program log: order_id: 13985890735038016",
# "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source
# "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target
# "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410",
# "Program log: after_source_balance: 0, after_destination_balance: 472509072",
# "Program log: source_token_change: 58730110139, destination_token_change: 270131294",
async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
try:
tr_info = await get_swap_transaction_details(tx_signature_str)
except Exception as e:
logging.error(f"Error fetching swap transaction details: {e}")
tr_info = await get_transaction_details_with_retry(tx_signature_str)
# Fetch token prices
token_prices = await get_token_prices([tr_info['token_in'], tr_info['token_out']])
# Calculate USD values
tr_info['amount_in_usd'] = tr_info['amount_in'] * token_prices.get(tr_info['token_in'], 0)
tr_info['amount_out_usd'] = tr_info['amount_out'] * token_prices.get(tr_info['token_out'], 0)
# Calculate the percentage of the source balance that was swapped
tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50
return tr_info
def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balances:
if balance['mint'] == token:
return float(balance['uiTokenAmount']['amount'])
return 0.0
async def follow_move(move):
your_balances = await get_wallet_balances(YOUR_WALLET)
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
if not your_balance_info:
msg = f"<b>Move Failed:</b>\nNo balance found for token {move['token_in']}"
logging.warning(msg)
await send_telegram_message(msg)
return
your_balance = your_balance_info['amount']
token_name = your_balance_info['name']
# move["percentage_swapped"] = (move["amount_out"] / move["amount_in"]) * 100
# Calculate the amount to swap based on the same percentage as the followed move
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
if your_balance >= amount_to_swap:
try:
private_key = Keypair.from_bytes(base58.b58decode(os.getenv("PK")))
async_client = AsyncClient(SOLANA_WS_URL)
jupiter = Jupiter(async_client, private_key)
transaction_data = await jupiter.swap(
input_mint=move['token_in'],
output_mint=move['token_out'],
amount=int(amount_to_swap * 1e6), # Convert to lamports
slippage_bps=50, # Increased to 0.5%
)
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message))
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
# send the transaction
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
transaction_id = json.loads(result.to_json())['result']
print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}")
tx_details = await get_transaction_details_with_retry(transaction_id)
if tx_details is None:
logging.info(f"Failed to get transaction details for {transaction_id}")
notification = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {token_name} ({move['token_in']}) "
f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
)
else:
output_token_info = your_balances.get(move['token_out'], {'name': 'Unknown'})
output_token_name = output_token_info['name']
notification = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {token_name} ({move['token_in']}) "
f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
f"for {tx_details['amount_out']:.6f} {output_token_name} ({move['token_out']})"
# f"Amount In USD: {tr_details['amount_in_USD']}\n"
f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
)
logging.info(notification)
await send_telegram_message(notification)
except Exception as e:
error_message = f"<b>Swap Follow Error:</b>\n{str(e)}"
logging.error(error_message)
# await send_telegram_message(error_message)
else:
msg = (
f"<b>Move Not Followed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.6f} {token_name} ({move['token_in']})"
)
logging.warning(msg)
await send_telegram_message(msg)
# Helper functions (implement these according to your needs)
async def on_logs(log):
logging.debug(f"Received log: {log}")
await process_log(log)
async def subscribe_to_wallet():
SOLANA_ENDPOINTS = [
"wss://api.mainnet-beta.solana.com",
"wss://solana-api.projectserum.com",
"wss://rpc.ankr.com/solana",
"wss://mainnet.rpcpool.com",
]
uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com
reconnect_delay = 5 # Start with a 5-second delay
max_reconnect_delay = 60 # Maximum delay of 60 seconds
while True:
try:
async with websockets.connect(uri) as websocket:
logger.info("Connected to Solana websocket")
subscription_id = await load_subscription_id()
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{
"mentions": [FOLLOWED_WALLET]
},
{
"commitment": "confirmed"
}
]
}
await websocket.send(json.dumps(request))
logger.info("Subscription request sent")
while True:
try:
response = await websocket.recv()
response_data = json.loads(response)
logger.debug(f"Received response: {response_data}")
if 'result' in response_data:
subscription_id = response_data['result']
await save_subscription_id(subscription_id)
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
elif 'params' in response_data:
await on_logs(response_data['params']['result'])
else:
logger.warning(f"Unexpected response: {response}")
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed unexpectedly: {e}")
break
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
break
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
await asyncio.sleep(reconnect_delay)
# Implement exponential backoff
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
logger = logging.getLogger(__name__)
async def main():
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
# await list_initial_wallet_states()
await subscribe_to_wallet()
def run_flask():
# Run Flask app without the reloader, so we can run the async main function
app.run(debug=False, port=3001, use_reloader=False)
if __name__ == '__main__':
# Start Flask in a separate thread
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()
# Create an event loop for the async tasks
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Start Flask in a separate thread
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()
# Run the async main function
asyncio.run(main())