744 lines
30 KiB
Python
744 lines
30 KiB
Python
import asyncio
|
|
import websockets
|
|
import json
|
|
from flask import Flask, render_template, request, jsonify
|
|
from solana.rpc.async_api import AsyncClient
|
|
from solana.transaction import Signature
|
|
from solana.rpc.websocket_api import connect
|
|
from solana.rpc.types import TokenAccountOpts, TxOpts
|
|
from solana.rpc.commitment import Confirmed
|
|
from base58 import b58decode
|
|
from solders.signature import Signature
|
|
from solders.pubkey import Pubkey
|
|
from solders.keypair import Keypair
|
|
from solders.transaction import VersionedTransaction
|
|
from solders.transaction import Transaction
|
|
from solders.message import Message
|
|
from solders.instruction import Instruction
|
|
from solders.hash import Hash
|
|
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
|
|
from dexscreener import DexscreenerClient
|
|
from telegram import Bot
|
|
from telegram.constants import ParseMode
|
|
import datetime
|
|
import logging
|
|
import base64
|
|
import os
|
|
from dotenv import load_dotenv,set_key
|
|
import aiohttp
|
|
from typing import List, Dict
|
|
import requests
|
|
import threading
|
|
import re
|
|
|
|
load_dotenv()
|
|
app = Flask(__name__)
|
|
|
|
ENV_FILE = '.env'
|
|
|
|
async def save_subscription_id(subscription_id):
|
|
# storing subscription id in .env file disabled
|
|
#set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id))
|
|
logger.info(f"Saved subscription ID: {subscription_id}")
|
|
|
|
async def load_subscription_id():
|
|
subscription_id = os.getenv("SUBSCRIPTION_ID")
|
|
return int(subscription_id) if subscription_id else None
|
|
|
|
|
|
|
|
# Function to find the latest log file
|
|
def get_latest_log_file():
|
|
log_dir = './logs'
|
|
try:
|
|
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
|
|
# filter files mask log_20241005_004103_143116.json
|
|
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
|
|
|
|
latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x)))
|
|
return os.path.join(log_dir, latest_file)
|
|
except Exception as e:
|
|
logging.error(f"Error fetching latest log file: {e}")
|
|
return None
|
|
|
|
# Flask route to retry processing the last log
|
|
@app.route('/retry-last-log', methods=['GET'])
|
|
def retry_last_log():
|
|
latest_log_file = get_latest_log_file()
|
|
if not latest_log_file:
|
|
return jsonify({"error": "No log files found"}), 404
|
|
|
|
try:
|
|
with open(latest_log_file, 'r') as f:
|
|
log = json.load(f)
|
|
|
|
# Run the asynchronous process_log function
|
|
asyncio.run(process_log(log))
|
|
return jsonify({"status": "Log processed successfully"}), 200
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing log: {e}")
|
|
return jsonify({"error": "Failed to process log"}), 500
|
|
|
|
|
|
|
|
|
|
# Configuration
|
|
DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
|
|
FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
|
|
YOUR_WALLET = os.getenv("YOUR_WALLET")
|
|
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
|
SOLANA_WS_URL = os.getenv("SOLANA_WS_URL")
|
|
SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL")
|
|
DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
|
|
|
|
|
|
# Use the production Solana RPC endpoint
|
|
solana_client = AsyncClient(SOLANA_HTTP_URL)
|
|
dexscreener_client = DexscreenerClient()
|
|
|
|
|
|
# Initialize Telegram Bot
|
|
bot = Bot(token=TELEGRAM_BOT_TOKEN)
|
|
|
|
# Token addresses (initialize with some known tokens)
|
|
TOKEN_ADDRESSES = {
|
|
"SOL": "So11111111111111111111111111111111111111112",
|
|
"USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
|
|
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og",
|
|
}
|
|
|
|
async def send_telegram_message(message):
|
|
try:
|
|
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
|
|
logging.info(f"Telegram message sent: {message}")
|
|
except Exception as e:
|
|
logging.error(f"Error sending Telegram message: {str(e)}")
|
|
|
|
|
|
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
|
|
coingecko_prices = await get_prices_from_coingecko(token_addresses)
|
|
|
|
# For tokens not found in CoinGecko, use DexScreener
|
|
missing_tokens = set(token_addresses) - set(coingecko_prices.keys())
|
|
if missing_tokens:
|
|
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
|
|
coingecko_prices.update(dexscreener_prices)
|
|
|
|
# If any tokens are still missing, set their prices to 0
|
|
for token in set(token_addresses) - set(coingecko_prices.keys()):
|
|
coingecko_prices[token] = 0.0
|
|
logging.warning(f"Price not found for token {token}. Setting to 0.")
|
|
|
|
return coingecko_prices
|
|
|
|
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
|
|
url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
|
|
params = {
|
|
"contract_addresses": ",".join(token_addresses),
|
|
"vs_currencies": DISPLAY_CURRENCY.lower()
|
|
}
|
|
prices = {}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, params=params) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
for address, price_info in data.items():
|
|
if DISPLAY_CURRENCY.lower() in price_info:
|
|
prices[address] = price_info[DISPLAY_CURRENCY.lower()]
|
|
else:
|
|
logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}")
|
|
|
|
return prices
|
|
|
|
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
|
|
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
|
|
prices = {}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
for address, result in zip(token_addresses, results):
|
|
if result and 'pairs' in result and result['pairs']:
|
|
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
|
|
prices[address] = float(pair['priceUsd'])
|
|
else:
|
|
logging.warning(f"No price data found on DexScreener for token {address}")
|
|
|
|
return prices
|
|
|
|
async def fetch_token_data(session, url):
|
|
try:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
return await response.json()
|
|
else:
|
|
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
|
|
return None
|
|
except Exception as e:
|
|
logging.error(f"Error fetching data from {url}: {str(e)}")
|
|
return None
|
|
|
|
async def get_sol_price() -> float:
|
|
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
return data['solana'][DISPLAY_CURRENCY.lower()]
|
|
else:
|
|
logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
|
|
return await get_sol_price_from_dexscreener()
|
|
|
|
async def get_sol_price_from_dexscreener() -> float:
|
|
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
|
|
prices = await get_prices_from_dexscreener([sol_address])
|
|
return prices.get(sol_address, 0.0)
|
|
|
|
async def get_sol_price() -> float:
|
|
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
if response.status == 200:
|
|
data = await response.json()
|
|
return data['solana'][DISPLAY_CURRENCY.lower()]
|
|
else:
|
|
logging.error(f"Failed to get SOL price. Status: {response.status}")
|
|
return None
|
|
|
|
async def convert_balances_to_currency(balances, token_prices, sol_price):
|
|
converted_balances = {}
|
|
for token, amount in balances.items():
|
|
if token == 'SOL':
|
|
converted_balances[token] = amount * sol_price
|
|
elif token in token_prices:
|
|
converted_balances[token] = amount * token_prices[token]
|
|
else:
|
|
converted_balances[token] = None # Price not available
|
|
logging.warning(f"Price not available for token {token}")
|
|
return converted_balances
|
|
|
|
|
|
async def get_token_balance_rpc(wallet_address, token_address):
|
|
url = SOLANA_HTTP_URL
|
|
headers = {"Content-Type": "application/json"}
|
|
data = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "getTokenAccountsByOwner",
|
|
"params": [
|
|
wallet_address,
|
|
{
|
|
"mint": token_address
|
|
},
|
|
{
|
|
"encoding": "jsonParsed"
|
|
}
|
|
]
|
|
}
|
|
try:
|
|
response = requests.post(url, headers=headers, data=json.dumps(data))
|
|
response.raise_for_status() # Raises an error for bad responses
|
|
accounts = response.json()
|
|
|
|
if 'result' in accounts and accounts['result']['value']:
|
|
first_account = accounts['result']['value'][0]['pubkey']
|
|
balance_data = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "getTokenAccountBalance",
|
|
"params": [
|
|
first_account
|
|
]
|
|
}
|
|
balance_response = requests.post(url, headers=headers, data=json.dumps(balance_data))
|
|
balance_response.raise_for_status()
|
|
balance = balance_response.json()
|
|
|
|
if 'result' in balance and 'value' in balance['result']:
|
|
amount = float(balance['result']['value']['uiAmount'])
|
|
logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}")
|
|
return amount
|
|
else:
|
|
logging.debug(f"No balance found for {token_address} in {wallet_address}")
|
|
return 0
|
|
else:
|
|
logging.debug(f"No account found for {token_address} in {wallet_address}")
|
|
return 0
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}")
|
|
return 0
|
|
|
|
|
|
|
|
|
|
async def get_token_name(mint_address):
|
|
try:
|
|
token_info = await solana_client.get_token_supply(Pubkey.from_string(mint_address))
|
|
if token_info.value and 'symbol' in token_info.value:
|
|
return token_info.value['symbol']
|
|
except Exception as e:
|
|
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
|
|
return None
|
|
|
|
async def get_wallet_balances(wallet_address):
|
|
balances = {}
|
|
logging.info(f"Getting balances for wallet: {wallet_address}")
|
|
|
|
try:
|
|
response = await solana_client.get_token_accounts_by_owner_json_parsed(
|
|
Pubkey.from_string(wallet_address),
|
|
opts=TokenAccountOpts(
|
|
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
|
|
),
|
|
commitment=Confirmed
|
|
)
|
|
|
|
if response.value:
|
|
for account in response.value:
|
|
parsed_data = account.account.data.parsed
|
|
if isinstance(parsed_data, dict) and 'info' in parsed_data:
|
|
info = parsed_data['info']
|
|
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
|
|
mint = info['mint']
|
|
amount = float(info['tokenAmount']['uiAmount'])
|
|
if amount > 0:
|
|
token_name = await get_token_name(mint) or 'Unknown'
|
|
balances[mint] = {
|
|
'name': token_name,
|
|
'address': mint,
|
|
'amount': amount
|
|
}
|
|
logging.debug(f"Balance for {token_name} ({mint}): {amount}")
|
|
else:
|
|
logging.warning(f"Unexpected data format for account: {account}")
|
|
|
|
sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
|
|
if sol_balance.value is not None:
|
|
balances['SOL'] = {
|
|
'name': 'SOL',
|
|
'address': 'SOL',
|
|
'amount': sol_balance.value / 1e9
|
|
}
|
|
else:
|
|
logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error getting wallet balances: {str(e)}")
|
|
|
|
return balances
|
|
|
|
async def list_initial_wallet_states():
|
|
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE
|
|
|
|
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
|
|
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
|
|
|
|
all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()))
|
|
token_prices = await get_token_prices(all_token_addresses)
|
|
sol_price = await get_sol_price()
|
|
|
|
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price)
|
|
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price)
|
|
|
|
TOKEN_ADDRESSES = {token: balance['amount'] for token, balance in {**followed_converted_balances, **your_converted_balances}.items() if balance['amount'] is not None and balance['amount'] > 0}
|
|
logging.info(f"Monitoring balances for tokens: {[balance['name'] for balance in TOKEN_ADDRESSES.values()]}")
|
|
|
|
followed_wallet_state = []
|
|
FOLLOWED_WALLET_VALUE = 0
|
|
for token, balance in followed_converted_balances.items():
|
|
if balance['amount'] is not None and balance['amount'] > 0:
|
|
followed_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}")
|
|
FOLLOWED_WALLET_VALUE += balance['amount']
|
|
|
|
your_wallet_state = []
|
|
YOUR_WALLET_VALUE = 0
|
|
for token, balance in your_converted_balances.items():
|
|
if balance['amount'] is not None and balance['amount'] > 0:
|
|
your_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}")
|
|
YOUR_WALLET_VALUE += balance['amount']
|
|
|
|
message = (
|
|
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
|
|
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
|
|
f"{chr(10).join(followed_wallet_state)}\n"
|
|
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
|
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
|
|
f"{chr(10).join(your_wallet_state)}\n"
|
|
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
|
f"<b>Monitored Tokens:</b>\n"
|
|
f"{', '.join([balance['name'] for balance in TOKEN_ADDRESSES.values()])}"
|
|
)
|
|
|
|
logging.info(message)
|
|
await send_telegram_message(message)
|
|
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE
|
|
|
|
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
|
|
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
|
|
|
|
all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()))
|
|
token_prices = await get_token_prices(all_token_addresses)
|
|
sol_price = await get_sol_price()
|
|
|
|
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price)
|
|
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price)
|
|
|
|
TOKEN_ADDRESSES = {token: amount for token, amount in {**followed_converted_balances, **your_converted_balances}.items() if amount is not None and amount > 0}
|
|
logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}")
|
|
|
|
followed_wallet_state = []
|
|
FOLLOWED_WALLET_VALUE = 0
|
|
for token, amount in followed_converted_balances.items():
|
|
if amount is not None and amount > 0:
|
|
followed_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
|
|
FOLLOWED_WALLET_VALUE += amount
|
|
|
|
your_wallet_state = []
|
|
YOUR_WALLET_VALUE = 0
|
|
for token, amount in your_converted_balances.items():
|
|
if amount is not None and amount > 0:
|
|
your_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
|
|
YOUR_WALLET_VALUE += amount
|
|
|
|
message = (
|
|
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
|
|
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
|
|
f"{chr(10).join(followed_wallet_state)}\n"
|
|
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
|
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
|
|
f"{chr(10).join(your_wallet_state)}\n"
|
|
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
|
f"<b>Monitored Tokens:</b>\n"
|
|
f"{', '.join(TOKEN_ADDRESSES.keys())}"
|
|
)
|
|
|
|
logging.info(message)
|
|
await send_telegram_message(message)
|
|
|
|
|
|
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
|
|
url = SOLANA_HTTP_URL
|
|
# url = 'https://solana.drpc.org'
|
|
headers = {"Content-Type": "application/json"}
|
|
data = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "getTransaction",
|
|
"params": [
|
|
tx_signature,
|
|
{
|
|
"encoding": "jsonParsed",
|
|
"maxSupportedTransactionVersion": 0
|
|
}
|
|
]
|
|
}
|
|
try:
|
|
if readfromDump and os.path.exists('./logs/transation_details.json'):
|
|
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
|
|
transaction_details = json.load(f)
|
|
return transaction_details
|
|
else:
|
|
response = requests.post(url, headers=headers, data=json.dumps(data))
|
|
response.raise_for_status() # Raises an error for bad responses
|
|
transaction_details = response.json()
|
|
with open('./logs/transation_details.json', 'w') as f:
|
|
json.dump(transaction_details, f, indent=2)
|
|
|
|
|
|
if 'result' in transaction_details:
|
|
print(transaction_details['result'])
|
|
return transaction_details['result']
|
|
else:
|
|
print("Unexpected response:", transaction_details)
|
|
except requests.exceptions.RequestException as e:
|
|
print("Error fetching transaction details:", e)
|
|
|
|
|
|
async def save_log(log):
|
|
try:
|
|
os.makedirs('./logs', exist_ok=True)
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
filename = f"./logs/log_{timestamp}.json"
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(log, f, indent=2)
|
|
except Exception as e:
|
|
logging.error(f"Error saving RPC log: {e}")
|
|
|
|
|
|
|
|
async def process_log(log_result):
|
|
if log_result['value']['err']:
|
|
return
|
|
|
|
tx_signature_str = log_result['value']['signature']
|
|
logs = log_result['value']['logs']
|
|
try:
|
|
# Detect swap operations in logs
|
|
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2']
|
|
|
|
for log_entry in logs:
|
|
if any(op in log_entry for op in swap_operations):
|
|
try:
|
|
details = await parse_swap_logs(logs)
|
|
|
|
message_text = (
|
|
f"Swap detected:\n"
|
|
f"Order ID: {details['order_id']}\n"
|
|
f"Token In: {details['token_in']}\n"
|
|
f"Token Out: {details['token_out']}\n"
|
|
f"Amount In USD: {details['amount_in_USD']}\n"
|
|
f"Percentage Swapped: {details['percentage_swapped']:.2f}%"
|
|
)
|
|
|
|
await send_telegram_message(message_text)
|
|
await follow_move(details)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error fetching transaction details: {e}")
|
|
return
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing log: {e}")
|
|
|
|
|
|
|
|
# "Program log: Instruction: Swap2",
|
|
# "Program log: order_id: 13985890735038016",
|
|
# "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source
|
|
# "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target
|
|
# "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410",
|
|
# "Program log: after_source_balance: 0, after_destination_balance: 472509072",
|
|
# "Program log: source_token_change: 58730110139, destination_token_change: 270131294",
|
|
async def parse_swap_logs(logs):
|
|
token_in = None
|
|
token_out = None
|
|
amount_in = 0
|
|
amount_out_expected = 0
|
|
amount_out_actual = 0
|
|
order_id = None
|
|
before_source_balance = 0
|
|
|
|
for log in logs:
|
|
if "Program log:" in log:
|
|
if "order_id:" in log:
|
|
order_id = log.split("order_id: ")[-1].strip()
|
|
elif "Swap2" in log:
|
|
token_in = None
|
|
token_out = None
|
|
elif not token_in:
|
|
token_in = log.split("Program log: ")[-1].strip()
|
|
elif not token_out:
|
|
token_out = log.split("Program log: ")[-1].strip()
|
|
|
|
if "before_source_balance:" in log:
|
|
before_source_balance = int(re.search(r"before_source_balance: (\d+)", log).group(1))
|
|
|
|
if "amount_in" in log or "amount_out" in log:
|
|
amount_matches = re.findall(r"(amount_in|amount_out): (\d+)", log)
|
|
for amount_type, value in amount_matches:
|
|
if amount_type == "amount_in":
|
|
amount_in = int(value)
|
|
elif amount_type == "amount_out":
|
|
amount_out_expected = int(value)
|
|
|
|
elif "source_token_change:" in log or "destination_token_change:" in log:
|
|
changes = log.split(", ")
|
|
for change in changes:
|
|
if "source_token_change" in change:
|
|
amount_in = int(change.split(": ")[-1])
|
|
elif "destination_token_change" in change:
|
|
amount_out_actual = int(change.split(": ")[-1])
|
|
|
|
token_prices = await get_token_prices([token_in, token_out])
|
|
amount_in_usd = amount_in / 1e6 * token_prices.get(token_in, 0)
|
|
amount_out_usd = amount_out_actual / 1e6 * token_prices.get(token_out, 0)
|
|
|
|
# Calculate the percentage of the source balance that was swapped
|
|
percentage_swapped = (amount_in / before_source_balance) * 100 if before_source_balance > 0 else 0
|
|
|
|
return {
|
|
"order_id": order_id,
|
|
"token_in": token_in,
|
|
"token_out": token_out,
|
|
"amount_in": amount_in / 1e6,
|
|
"amount_out_expected": amount_out_expected / 1e6,
|
|
"amount_out_actual": amount_out_actual / 1e6,
|
|
"amount_in_USD": amount_in_usd,
|
|
"amount_out_USD": amount_out_usd,
|
|
"percentage_swapped": percentage_swapped
|
|
}
|
|
|
|
async def follow_move(move):
|
|
your_balances = await get_wallet_balances(YOUR_WALLET)
|
|
your_balance_info = your_balances.get(move['token_in'])
|
|
|
|
if not your_balance_info:
|
|
message = f"<b>Move Failed:</b>\nNo balance found for token {move['token_in']}"
|
|
logging.warning(message)
|
|
await send_telegram_message(message)
|
|
return
|
|
|
|
your_balance = your_balance_info['amount']
|
|
token_name = your_balance_info['name']
|
|
|
|
# Calculate the amount to swap based on the same percentage as the followed move
|
|
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
|
|
|
|
if your_balance >= amount_to_swap:
|
|
try:
|
|
private_key = Keypair.from_bytes(base58.b58decode(os.getenv("PK")))
|
|
async_client = AsyncClient(SOLANA_WS_URL)
|
|
jupiter = Jupiter(async_client, private_key)
|
|
|
|
transaction_data = await jupiter.swap(
|
|
input_mint=move['token_in'],
|
|
output_mint=move['token_out'],
|
|
amount=int(amount_to_swap * 1e6), # Convert to lamports
|
|
slippage_bps=1,
|
|
)
|
|
|
|
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
|
|
signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message))
|
|
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
|
|
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
|
|
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
|
|
transaction_id = json.loads(result.to_json())['result']
|
|
|
|
output_token_info = your_balances.get(move['token_out'], {'name': 'Unknown'})
|
|
output_token_name = output_token_info['name']
|
|
|
|
message = (
|
|
f"<b>Move Followed:</b>\n"
|
|
f"Swapped {amount_to_swap:.6f} {token_name} ({move['token_in']}) "
|
|
f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
|
|
f"for {transaction_data['outputAmount'] / 1e6:.6f} {output_token_name} ({move['token_out']})"
|
|
)
|
|
logging.info(message)
|
|
await send_telegram_message(message)
|
|
except Exception as e:
|
|
error_message = f"<b>Swap Error:</b>\n{str(e)}"
|
|
logging.error(error_message)
|
|
await send_telegram_message(error_message)
|
|
else:
|
|
message = (
|
|
f"<b>Move Failed:</b>\n"
|
|
f"Insufficient balance to swap {amount_to_swap:.6f} {token_name} ({move['token_in']})"
|
|
)
|
|
logging.warning(message)
|
|
await send_telegram_message(message)
|
|
# Helper functions (implement these according to your needs)
|
|
|
|
|
|
|
|
async def on_logs(log):
|
|
logging.debug(f"Received log: {log}")
|
|
await save_log(log)
|
|
await process_log(log)
|
|
|
|
|
|
async def subscribe_to_wallet():
|
|
SOLANA_ENDPOINTS = [
|
|
"wss://api.mainnet-beta.solana.com",
|
|
"wss://solana-api.projectserum.com",
|
|
"wss://rpc.ankr.com/solana",
|
|
"wss://mainnet.rpcpool.com",
|
|
]
|
|
uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com
|
|
reconnect_delay = 5 # Start with a 5-second delay
|
|
max_reconnect_delay = 60 # Maximum delay of 60 seconds
|
|
|
|
while True:
|
|
try:
|
|
async with websockets.connect(uri) as websocket:
|
|
logger.info("Connected to Solana websocket")
|
|
|
|
subscription_id = await load_subscription_id()
|
|
|
|
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": "logsSubscribe",
|
|
"params": [
|
|
{
|
|
"mentions": [FOLLOWED_WALLET]
|
|
},
|
|
{
|
|
"commitment": "confirmed"
|
|
}
|
|
]
|
|
}
|
|
|
|
await websocket.send(json.dumps(request))
|
|
logger.info("Subscription request sent")
|
|
|
|
while True:
|
|
try:
|
|
response = await websocket.recv()
|
|
response_data = json.loads(response)
|
|
logger.debug(f"Received response: {response_data}")
|
|
if 'result' in response_data:
|
|
subscription_id = response_data['result']
|
|
await save_subscription_id(subscription_id)
|
|
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
|
|
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
|
|
await list_initial_wallet_states()
|
|
|
|
elif 'params' in response_data:
|
|
await on_logs(response_data['params']['result'])
|
|
else:
|
|
logger.warning(f"Unexpected response: {response}")
|
|
|
|
except websockets.exceptions.ConnectionClosedError as e:
|
|
logger.error(f"Connection closed unexpectedly: {e}")
|
|
break
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to decode JSON: {e}")
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}")
|
|
break
|
|
|
|
except websockets.exceptions.WebSocketException as e:
|
|
logger.error(f"WebSocket error: {e}")
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}")
|
|
|
|
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
|
|
await asyncio.sleep(reconnect_delay)
|
|
|
|
# Implement exponential backoff
|
|
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def main():
|
|
# Initialize logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
|
await subscribe_to_wallet()
|
|
|
|
def run_flask():
|
|
# Run Flask app without the reloader, so we can run the async main function
|
|
app.run(debug=False, port=3001, use_reloader=False)
|
|
|
|
if __name__ == '__main__':
|
|
# Start Flask in a separate thread
|
|
flask_thread = threading.Thread(target=run_flask)
|
|
flask_thread.start()
|
|
|
|
# Create an event loop for the async tasks
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(main())
|
|
# Start Flask in a separate thread
|
|
flask_thread = threading.Thread(target=run_flask)
|
|
flask_thread.start()
|
|
|
|
# Run the async main function
|
|
asyncio.run(main())
|