807 lines
31 KiB
Python
807 lines
31 KiB
Python
import asyncio
|
|
import websockets
|
|
import json
|
|
from flask import Flask, render_template, request, jsonify
|
|
from solana.rpc.async_api import AsyncClient
|
|
from solana.transaction import Signature
|
|
from solana.rpc.websocket_api import connect
|
|
from solana.rpc.types import TokenAccountOpts, TxOpts
|
|
from solana.rpc.commitment import Confirmed
|
|
from solders.pubkey import Pubkey
|
|
from dexscreener import DexscreenerClient
|
|
from telegram import Bot
|
|
from telegram.constants import ParseMode
|
|
import datetime
|
|
import logging
|
|
import base64
|
|
import os
|
|
from dotenv import load_dotenv,set_key
|
|
import aiohttp
|
|
from typing import List, Dict
|
|
import requests
|
|
import threading
|
|
|
|
|
|
load_dotenv()
|
|
app = Flask(__name__)
|
|
|
|
# Function to find the latest log file
|
|
def get_latest_log_file():
|
|
log_dir = './logs'
|
|
try:
|
|
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
|
|
# filter files mask log_20241005_004103_143116.json
|
|
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
|
|
|
|
latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x)))
|
|
return os.path.join(log_dir, latest_file)
|
|
except Exception as e:
|
|
logging.error(f"Error fetching latest log file: {e}")
|
|
return None
|
|
|
|
# Flask route to retry processing the last log
|
|
@app.route('/retry-last-log', methods=['GET'])
|
|
def retry_last_log():
|
|
latest_log_file = get_latest_log_file()
|
|
if not latest_log_file:
|
|
return jsonify({"error": "No log files found"}), 404
|
|
|
|
try:
|
|
with open(latest_log_file, 'r') as f:
|
|
log = json.load(f)
|
|
|
|
# Run the asynchronous process_log function
|
|
asyncio.run(process_log(log))
|
|
return jsonify({"status": "Log processed successfully"}), 200
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing log: {e}")
|
|
return jsonify({"error": "Failed to process log"}), 500
|
|
|
|
|
|
|
|
|
|
# Configuration
|
|
DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
|
|
FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
|
|
YOUR_WALLET = os.getenv("YOUR_WALLET")
|
|
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
|
SOLANA_WS_URL = os.getenv("SOLANA_WS_URL")
|
|
SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL")
|
|
DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
|
|
|
|
|
|
# Use the production Solana RPC endpoint
|
|
solana_client = AsyncClient(SOLANA_HTTP_URL)
|
|
dexscreener_client = DexscreenerClient()
|
|
|
|
|
|
# Initialize Telegram Bot
|
|
bot = Bot(token=TELEGRAM_BOT_TOKEN)
|
|
|
|
# Token addresses (initialize with some known tokens)
|
|
TOKEN_ADDRESSES = {
|
|
"SOL": "So11111111111111111111111111111111111111112",
|
|
"USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
|
|
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og"
|
|
}
|
|
|
|
async def send_telegram_message(message):
|
|
try:
|
|
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
|
|
logging.info(f"Telegram message sent: {message}")
|
|
except Exception as e:
|
|
logging.error(f"Error sending Telegram message: {str(e)}")
|
|
|
|
|
|
# async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
|
|
# url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
|
|
# params = {
|
|
# "contract_addresses": ",".join(token_addresses),
|
|
# "vs_currencies": DISPLAY_CURRENCY.lower()
|
|
# }
|
|
# prices = {}
|
|
|
|
# async with aiohttp.ClientSession() as session:
|
|
# async with session.get(url, params=params) as response:
|
|
# if response.status == 200:
|
|
# data = await response.json()
|
|
# for address, price_info in data.items():
|
|
# if DISPLAY_CURRENCY.lower() in price_info:
|
|
# prices[address] = price_info[DISPLAY_CURRENCY.lower()]
|
|
# else:
|
|
# logging.error(f"Failed to get token prices. Status: {response.status}")
|
|
|
|
# # For tokens not found in CoinGecko, try to get price from a DEX or set a default value
|
|
# missing_tokens = set(token_addresses) - set(prices.keys())
|
|
# for token in missing_tokens:
|
|
# # You might want to implement a fallback method here, such as:
|
|
# # prices[token] = await get_price_from_dex(token)
|
|
# # For now, we'll set a default value
|
|
# prices[token] = 0.0
|
|
# logging.warning(f"Price not found for token {token}. Setting to 0.")
|
|
|
|
# return prices
|
|
|
|
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
|
|
coingecko_prices = await get_prices_from_coingecko(token_addresses)
|
|
|
|
# For tokens not found in CoinGecko, use DexScreener
|
|
missing_tokens = set(token_addresses) - set(coingecko_prices.keys())
|
|
if missing_tokens:
|
|
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
|
|
coingecko_prices.update(dexscreener_prices)
|
|
|
|
# If any tokens are still missing, set their prices to 0
|
|
for token in set(token_addresses) - set(coingecko_prices.keys()):
|
|
coingecko_prices[token] = 0.0
|
|
logging.warning(f"Price not found for token {token}. Setting to 0.")
|
|
|
|
return coingecko_prices
|
|
|
|
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
|
|
url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
|
|
params = {
|
|
"contract_addresses": ",".join(token_addresses),
|
|
"vs_currencies": DISPLAY_CURRENCY.lower()
|
|
}
|
|
prices = {}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, params=params) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
for address, price_info in data.items():
|
|
if DISPLAY_CURRENCY.lower() in price_info:
|
|
prices[address] = price_info[DISPLAY_CURRENCY.lower()]
|
|
else:
|
|
logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}")
|
|
|
|
return prices
|
|
|
|
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
|
|
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
|
|
prices = {}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
for address, result in zip(token_addresses, results):
|
|
if result and 'pairs' in result and result['pairs']:
|
|
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
|
|
prices[address] = float(pair['priceUsd'])
|
|
else:
|
|
logging.warning(f"No price data found on DexScreener for token {address}")
|
|
|
|
return prices
|
|
|
|
async def fetch_token_data(session, url):
|
|
try:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return await response.json()
|
|
else:
|
|
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"Error fetching data from {url}: {str(e)}")
|
|
return None
|
|
|
|
async def get_sol_price() -> float:
|
|
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
return data['solana'][DISPLAY_CURRENCY.lower()]
|
|
else:
|
|
logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
|
|
return await get_sol_price_from_dexscreener()
|
|
|
|
async def get_sol_price_from_dexscreener() -> float:
|
|
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
|
|
prices = await get_prices_from_dexscreener([sol_address])
|
|
return prices.get(sol_address, 0.0)
|
|
|
|
async def get_sol_price() -> float:
|
|
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
return data['solana'][DISPLAY_CURRENCY.lower()]
|
|
else:
|
|
logging.error(f"Failed to get SOL price. Status: {response.status}")
|
|
return None
|
|
|
|
async def convert_balances_to_currency(balances, token_prices, sol_price):
|
|
converted_balances = {}
|
|
for token, amount in balances.items():
|
|
if token == 'SOL':
|
|
converted_balances[token] = amount * sol_price
|
|
elif token in token_prices:
|
|
converted_balances[token] = amount * token_prices[token]
|
|
else:
|
|
converted_balances[token] = None # Price not available
|
|
logging.warning(f"Price not available for token {token}")
|
|
return converted_balances
|
|
|
|
|
|
|
|
async def get_token_balance(wallet_address, token_address):
|
|
try:
|
|
response = await solana_client.get_token_accounts_by_owner(
|
|
Pubkey.from_string(wallet_address),
|
|
{"mint": Pubkey.from_string(token_address)}
|
|
)
|
|
if response['result']['value']:
|
|
balance = await solana_client.get_token_account_balance(
|
|
response['result']['value'][0]['pubkey']
|
|
)
|
|
amount = float(balance['result']['value']['uiAmount'])
|
|
logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}")
|
|
return amount
|
|
else:
|
|
logging.debug(f"No account found for {token_address} in {wallet_address}")
|
|
return 0
|
|
except Exception as e:
|
|
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}")
|
|
return 0
|
|
|
|
|
|
ENV_FILE = '.env'
|
|
|
|
async def save_subscription_id(subscription_id):
|
|
# storing subscription id in .env file disabled
|
|
#set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id))
|
|
logger.info(f"Saved subscription ID: {subscription_id}")
|
|
|
|
async def load_subscription_id():
|
|
subscription_id = os.getenv("SUBSCRIPTION_ID")
|
|
return int(subscription_id) if subscription_id else None
|
|
|
|
|
|
|
|
async def get_wallet_balances(wallet_address):
|
|
balances = {}
|
|
token_addresses = []
|
|
logging.info(f"Getting balances for wallet: {wallet_address}")
|
|
|
|
try:
|
|
response = await solana_client.get_token_accounts_by_owner_json_parsed(
|
|
Pubkey.from_string(wallet_address),
|
|
opts=TokenAccountOpts(
|
|
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
|
|
),
|
|
commitment=Confirmed
|
|
)
|
|
|
|
if response.value:
|
|
for account in response.value:
|
|
parsed_data = account.account.data.parsed
|
|
if isinstance(parsed_data, dict) and 'info' in parsed_data:
|
|
info = parsed_data['info']
|
|
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
|
|
mint = info['mint']
|
|
amount = float(info['tokenAmount']['uiAmount'])
|
|
if amount > 0:
|
|
token_addresses.append(mint)
|
|
balances[mint] = amount
|
|
logging.debug(f"Balance for {mint}: {amount}")
|
|
else:
|
|
logging.warning(f"Unexpected data format for account: {account}")
|
|
|
|
sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
|
|
if sol_balance.value is not None:
|
|
balances['SOL'] = sol_balance.value / 1e9
|
|
else:
|
|
logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting wallet balances: {str(e)}")
|
|
|
|
return balances, token_addresses
|
|
|
|
async def get_converted_balances(wallet_address):
|
|
balances, token_addresses = await get_wallet_balances(wallet_address)
|
|
token_prices = await get_token_prices(token_addresses)
|
|
sol_price = await get_sol_price()
|
|
converted_balances = await convert_balances_to_currency(balances, token_prices, sol_price)
|
|
return converted_balances
|
|
|
|
|
|
|
|
async def send_initial_wallet_states(followed_wallet, your_wallet):
|
|
followed_balances = await get_converted_balances(followed_wallet)
|
|
your_balances = await get_converted_balances(your_wallet)
|
|
|
|
message = f"Initial Wallet States (Non-zero balances in {DISPLAY_CURRENCY}):\n\n"
|
|
message += f"Followed Wallet ({followed_wallet}):\n"
|
|
for token, amount in followed_balances.items():
|
|
if amount and amount > 0:
|
|
message += f"{token}: {amount:.2f}\n"
|
|
|
|
message += f"\nYour Wallet ({your_wallet}):\n"
|
|
for token, amount in your_balances.items():
|
|
if amount and amount > 0:
|
|
message += f"{token}: {amount:.2f}\n"
|
|
|
|
message += "\nMonitored Tokens:\n"
|
|
# Add monitored tokens logic here if needed
|
|
|
|
await bot.send_message(chat_id=CHAT_ID, text=message)
|
|
|
|
|
|
|
|
async def get_non_zero_token_balances(wallet_address):
|
|
non_zero_balances = {}
|
|
logging.info(f"Getting non-zero balances for wallet: {wallet_address}")
|
|
for token, address in TOKEN_ADDRESSES.items():
|
|
balance = await get_token_balance(wallet_address, address)
|
|
if balance > 0:
|
|
non_zero_balances[token] = address
|
|
logging.debug(f"Non-zero balance for {token}: {balance}")
|
|
return non_zero_balances
|
|
|
|
|
|
async def list_initial_wallet_states():
|
|
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE
|
|
|
|
followed_wallet_balances, followed_token_addresses = await get_wallet_balances(FOLLOWED_WALLET)
|
|
your_wallet_balances, your_token_addresses = await get_wallet_balances(YOUR_WALLET)
|
|
|
|
all_token_addresses = list(set(followed_token_addresses + your_token_addresses))
|
|
token_prices = await get_token_prices(all_token_addresses)
|
|
sol_price = await get_sol_price()
|
|
|
|
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price)
|
|
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price)
|
|
|
|
TOKEN_ADDRESSES = {token: amount for token, amount in {**followed_converted_balances, **your_converted_balances}.items() if amount is not None and amount > 0}
|
|
logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}")
|
|
|
|
followed_wallet_state = []
|
|
FOLLOWED_WALLET_VALUE = 0
|
|
for token, amount in followed_converted_balances.items():
|
|
if amount is not None and amount > 0:
|
|
followed_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
|
|
FOLLOWED_WALLET_VALUE += amount
|
|
|
|
your_wallet_state = []
|
|
YOUR_WALLET_VALUE = 0
|
|
for token, amount in your_converted_balances.items():
|
|
if amount is not None and amount > 0:
|
|
your_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
|
|
YOUR_WALLET_VALUE += amount
|
|
|
|
message = (
|
|
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
|
|
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
|
|
f"{chr(10).join(followed_wallet_state)}\n"
|
|
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
|
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
|
|
f"{chr(10).join(your_wallet_state)}\n"
|
|
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
|
f"<b>Monitored Tokens:</b>\n"
|
|
f"{', '.join(TOKEN_ADDRESSES.keys())}"
|
|
)
|
|
|
|
logging.info(message)
|
|
await send_telegram_message(message)
|
|
|
|
|
|
async def follow_move(move):
|
|
followed_balances = await get_wallet_balances(FOLLOWED_WALLET)
|
|
your_balances = await get_wallet_balances(YOUR_WALLET)
|
|
|
|
if move['token'] not in followed_balances or move['token'] not in your_balances:
|
|
logging.error(f"Invalid token: {move['token']}")
|
|
return
|
|
|
|
followed_balance = followed_balances[move['token']]
|
|
your_balance = your_balances[move['token']]
|
|
|
|
proportion = your_balance / followed_balance if followed_balance > 0 else 0
|
|
amount_to_swap = move['amount'] * proportion
|
|
|
|
if your_balance >= amount_to_swap:
|
|
# Perform the swap using Jupiter API
|
|
try:
|
|
swap_result = perform_swap(move['token'], move['to_token'], amount_to_swap)
|
|
|
|
if swap_result['success']:
|
|
message = (
|
|
f"<b>Move Followed:</b>\n"
|
|
f"Swapped {amount_to_swap:.6f} {move['token']} "
|
|
f"for {swap_result['outputAmount']:.6f} {move['to_token']}"
|
|
)
|
|
logging.info(message)
|
|
else:
|
|
message = (
|
|
f"<b>Swap Failed:</b>\n"
|
|
f"Error: {swap_result['error']}"
|
|
)
|
|
logging.warning(message)
|
|
|
|
await send_telegram_message(message)
|
|
except Exception as e:
|
|
error_message = f"<b>Swap Error:</b>\n{str(e)}"
|
|
logging.error(error_message)
|
|
await send_telegram_message(error_message)
|
|
else:
|
|
message = (
|
|
f"<b>Move Failed:</b>\n"
|
|
f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}"
|
|
)
|
|
logging.warning(message)
|
|
await send_telegram_message(message)
|
|
|
|
def perform_swap(input_token, output_token, amount):
|
|
# Jupiter API endpoint
|
|
url = "https://quote-api.jup.ag/v4/quote"
|
|
|
|
# Parameters for the API request
|
|
params = {
|
|
"inputMint": input_token,
|
|
"outputMint": output_token,
|
|
"amount": int(amount * 10**9), # Convert to lamports
|
|
"slippageBps": 50, # 0.5% slippage
|
|
}
|
|
|
|
try:
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
quote = response.json()
|
|
|
|
# Get the best route
|
|
route = quote['data'][0]
|
|
|
|
# Perform the swap
|
|
swap_url = "https://quote-api.jup.ag/v4/swap"
|
|
swap_data = {
|
|
"quoteResponse": route,
|
|
"userPublicKey": YOUR_WALLET,
|
|
"wrapUnwrapSOL": True
|
|
}
|
|
|
|
swap_response = requests.post(swap_url, json=swap_data)
|
|
swap_response.raise_for_status()
|
|
swap_result = swap_response.json()
|
|
|
|
# Sign and send the transaction (this part depends on your wallet setup)
|
|
# For simplicity, we'll assume the transaction is successful
|
|
return {
|
|
"success": True,
|
|
"outputAmount": float(swap_result['outputAmount']) / 10**9 # Convert from lamports
|
|
}
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
from base58 import b58decode
|
|
from solders.pubkey import Pubkey
|
|
from solders.transaction import Transaction
|
|
from solders.signature import Signature
|
|
|
|
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
|
|
url = SOLANA_HTTP_URL
|
|
# url = 'https://solana.drpc.org'
|
|
headers = {"Content-Type": "application/json"}
|
|
data = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "getTransaction",
|
|
"params": [
|
|
tx_signature,
|
|
{
|
|
"encoding": "jsonParsed",
|
|
"maxSupportedTransactionVersion": 0
|
|
}
|
|
]
|
|
}
|
|
try:
|
|
if readfromDump and os.path.exists('./logs/transation_details.json'):
|
|
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
|
|
transaction_details = json.load(f)
|
|
return transaction_details
|
|
else:
|
|
response = requests.post(url, headers=headers, data=json.dumps(data))
|
|
response.raise_for_status() # Raises an error for bad responses
|
|
transaction_details = response.json()
|
|
with open('./logs/transation_details.json', 'w') as f:
|
|
json.dump(transaction_details, f, indent=2)
|
|
|
|
|
|
if 'result' in transaction_details:
|
|
print(transaction_details['result'])
|
|
return transaction_details['result']
|
|
else:
|
|
print("Unexpected response:", transaction_details)
|
|
except requests.exceptions.RequestException as e:
|
|
print("Error fetching transaction details:", e)
|
|
|
|
|
|
async def save_log(log):
|
|
try:
|
|
os.makedirs('./logs', exist_ok=True)
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
filename = f"./logs/log_{timestamp}.json"
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(log, f, indent=2)
|
|
except Exception as e:
|
|
logging.error(f"Error saving RPC log: {e}")
|
|
|
|
|
|
def determine_token(pubkey, watched_tokens):
|
|
# Check if the pubkey matches any watched token addresses
|
|
for token, address in watched_tokens.items():
|
|
if pubkey == address:
|
|
return token
|
|
return "Unknown"
|
|
|
|
def parse_amount_from_logs(logs):
|
|
amount_in = 0
|
|
amount_out = 0
|
|
|
|
for log in logs:
|
|
if 'SwapEvent' in log:
|
|
# Extract amounts from the log line
|
|
parts = log.split('amount_in: ')[1].split(', amount_out: ')
|
|
amount_in = int(parts[0])
|
|
amount_out = int(parts[1].split(' ')[0])
|
|
|
|
return amount_in, amount_out
|
|
|
|
def extract_swap_details(instruction, logs, watched_tokens):
|
|
# Extract source and target tokens along with amounts
|
|
from_pubkey = instruction['accounts'][0]
|
|
to_pubkey = instruction['accounts'][1]
|
|
amount_in, amount_out = parse_amount_from_logs(logs)
|
|
return from_pubkey, to_pubkey, amount_in, amount_out
|
|
|
|
|
|
|
|
async def process_log(log_result):
|
|
if log_result['value']['err']:
|
|
return
|
|
|
|
tx_signature_str = log_result['value']['signature']
|
|
logs = log_result['value']['logs']
|
|
|
|
try:
|
|
# Detect swap operations in logs
|
|
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2']
|
|
|
|
for log_entry in logs:
|
|
if any(op in log_entry for op in swap_operations):
|
|
try:
|
|
# ++ OLD using solana-py
|
|
# # Convert the base58 signature string to bytes
|
|
# tx_signature = Signature(b58decode(tx_signature_str))
|
|
# # Fetch transaction details
|
|
# tx_result = await solana_client.get_transaction(tx_signature, max_supported_transaction_version=0)
|
|
# #tx_result = await get_transaction_details(tx_signature_str)
|
|
# if tx_result.value is None:
|
|
# logging.error(f"Transaction not found: {tx_signature_str}")
|
|
# return
|
|
# transaction2 = tx_result.value.transaction
|
|
# -- OLD using solana-py
|
|
|
|
watched_tokens = await get_non_zero_token_balances(FOLLOWED_WALLET)
|
|
details = parse_swap_logs(logs, watched_tokens)
|
|
transaction = await get_transaction_details_rpc(tx_signature_str, True)
|
|
|
|
# instructions = transaction['transaction']['message']['instructions']
|
|
|
|
# for instruction in instructions:
|
|
# from_pubkey, to_pubkey, amount_in, amount_out = extract_swap_details(instruction, logs, watched_tokens)
|
|
|
|
# if from_pubkey in watched_tokens.values() or to_pubkey in watched_tokens.values():
|
|
# from_token = determine_token(from_pubkey, watched_tokens)
|
|
# to_token = determine_token(to_pubkey, watched_tokens)
|
|
|
|
# move = {
|
|
# 'token': from_token,
|
|
# 'amount': amount_in,
|
|
# 'to_token': to_token
|
|
# }
|
|
# message_text = (
|
|
# f"Swap detected:\n"
|
|
# f"From: {from_pubkey} ({from_token})\n"
|
|
# f"To: {to_pubkey} ({to_token})\n"
|
|
# f"Amount In: {amount_in}\n"
|
|
# f"Amount Out: {amount_out}"
|
|
# )
|
|
# await send_telegram_message(message_text)
|
|
# await follow_move(move)
|
|
|
|
tokens = []
|
|
|
|
# Check inner instructions for transfers and mints
|
|
for instruction_set in transaction.get('meta', {}).get('innerInstructions', []):
|
|
for instruction in instruction_set.get('instructions', []):
|
|
if 'parsed' in instruction and 'info' in instruction['parsed']:
|
|
info = instruction['parsed']['info']
|
|
if 'amount' in info:
|
|
amount = info['amount']
|
|
# Assume mint is available for mintTo
|
|
mint = info.get('mint', 'Unknown')
|
|
tokens.append({'amount': amount, 'mint': mint})
|
|
|
|
# Check post token balances for final token states
|
|
for balance in transaction.get('postTokenBalances', []):
|
|
amount = balance['uiTokenAmount']['amount']
|
|
mint = balance['mint']
|
|
tokens.append({'amount': amount, 'mint': mint})
|
|
|
|
# get amount_in, amount_out and token in and token out and USD value
|
|
swap_details = {
|
|
'amount_in': details['total_amount_in'],
|
|
'amount_out': details['total_amount_out'],
|
|
'tokens': tokens
|
|
}
|
|
|
|
message_text = (
|
|
f"Swap detected:\n"
|
|
f"Amount In: {swap_details['amount_in']}\n"
|
|
f"Amount Out: {swap_details['amount_out']}"
|
|
)
|
|
|
|
await send_telegram_message(message_text)
|
|
# await follow_move(move)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error fetching transaction details: {e}")
|
|
return
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing log: {e}")
|
|
|
|
def parse_swap_logs(logs, watched_tokens):
|
|
total_amount_in = 0
|
|
total_amount_out = 0
|
|
token_addresses = []
|
|
|
|
for log in logs:
|
|
if "SwapEvent" in log:
|
|
parts = log.split("{ ")[1].strip(" }").split(", ")
|
|
event_details = {}
|
|
for part in parts:
|
|
key, value = part.split(": ")
|
|
event_details[key.strip()] = value.strip()
|
|
|
|
# Aggregate amounts
|
|
total_amount_in += int(event_details.get("amount_in", 0))
|
|
total_amount_out += int(event_details.get("amount_out", 0))
|
|
|
|
if "source_token_change:" in log:
|
|
# Extract final source and destination token changes
|
|
changes = log.split(", ")
|
|
for change in changes:
|
|
key_value = change.split(": ", 1)
|
|
if len(key_value) != 2:
|
|
continue
|
|
key, value = key_value
|
|
if key == "source_token_change":
|
|
total_amount_in = int(value)
|
|
elif key == "destination_token_change":
|
|
total_amount_out = int(value)
|
|
|
|
return {
|
|
"total_amount_in": total_amount_in,
|
|
"total_amount_out": total_amount_out,
|
|
}
|
|
|
|
async def on_logs(log):
|
|
logging.debug(f"Received log: {log}")
|
|
await save_log(log)
|
|
await process_log(log)
|
|
|
|
|
|
async def subscribe_to_wallet():
|
|
SOLANA_ENDPOINTS = [
|
|
"wss://api.mainnet-beta.solana.com",
|
|
"wss://solana-api.projectserum.com",
|
|
"wss://rpc.ankr.com/solana",
|
|
"wss://mainnet.rpcpool.com",
|
|
]
|
|
uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com
|
|
reconnect_delay = 5 # Start with a 5-second delay
|
|
max_reconnect_delay = 60 # Maximum delay of 60 seconds
|
|
|
|
while True:
|
|
try:
|
|
async with websockets.connect(uri) as websocket:
|
|
logger.info("Connected to Solana websocket")
|
|
|
|
subscription_id = await load_subscription_id()
|
|
|
|
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "logsSubscribe",
|
|
"params": [
|
|
{
|
|
"mentions": [FOLLOWED_WALLET]
|
|
},
|
|
{
|
|
"commitment": "confirmed"
|
|
}
|
|
]
|
|
}
|
|
|
|
await websocket.send(json.dumps(request))
|
|
logger.info("Subscription request sent")
|
|
|
|
while True:
|
|
try:
|
|
response = await websocket.recv()
|
|
response_data = json.loads(response)
|
|
logger.debug(f"Received response: {response_data}")
|
|
if 'result' in response_data:
|
|
subscription_id = response_data['result']
|
|
await save_subscription_id(subscription_id)
|
|
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
|
|
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
|
|
await list_initial_wallet_states()
|
|
|
|
elif 'params' in response_data:
|
|
await on_logs(response_data['params']['result'])
|
|
else:
|
|
logger.warning(f"Unexpected response: {response}")
|
|
|
|
except websockets.exceptions.ConnectionClosedError as e:
|
|
logger.error(f"Connection closed unexpectedly: {e}")
|
|
break
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to decode JSON: {e}")
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}")
|
|
break
|
|
|
|
except websockets.exceptions.WebSocketException as e:
|
|
logger.error(f"WebSocket error: {e}")
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}")
|
|
|
|
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
|
|
# Implement exponential backoff
|
|
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def main():
|
|
# Initialize logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
|
# await subscribe_to_wallet()
|
|
|
|
def run_flask():
|
|
# Run Flask app without the reloader, so we can run the async main function
|
|
app.run(debug=False, port=3001, use_reloader=False)
|
|
|
|
if __name__ == '__main__':
|
|
# Start Flask in a separate thread
|
|
flask_thread = threading.Thread(target=run_flask)
|
|
flask_thread.start()
|
|
|
|
# Create an event loop for the async tasks
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(main())
|
|
# Start Flask in a separate thread
|
|
flask_thread = threading.Thread(target=run_flask)
|
|
flask_thread.start()
|
|
|
|
# Run the async main function
|
|
asyncio.run(main())
|