gogo2/crypto/sol/modules/SolanaAPI.py
2024-11-05 16:38:37 +02:00

1169 lines
56 KiB
Python

import struct
import sys
import os
import aiohttp
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Get the directory where the current script is located
script_dir = os.path.dirname(os.path.abspath(__file__))
root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
from dexscreener import DexscreenerClient
from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.async_api import AsyncClient
from solana.transaction import Signature
from solana.rpc.websocket_api import connect
from solana.rpc.commitment import Confirmed, Processed
from solana.transaction import Transaction
from spl.token.client import Token
from base64 import b64decode
import base58
from threading import Thread
from solana.rpc.async_api import AsyncClient
from solana.rpc.types import TxOpts
from solana.rpc.commitment import Confirmed, Finalized, Processed
from solders.rpc.requests import GetTransaction
from solders.signature import Signature
from solders.pubkey import Pubkey
from solders.keypair import Keypair
from solders.transaction import VersionedTransaction
from solders.transaction import Transaction
from solders.message import Message
from solders.instruction import Instruction
from solders.hash import Hash
from solders.instruction import CompiledInstruction
from solders import message
from jupiter_python_sdk.jupiter import Jupiter
import asyncio
import json
import logging
import random
import websockets
from typing import Dict, List, Optional
import requests
from datetime import datetime
from solana.rpc.types import TokenAccountOpts, TxOpts
from typing import List, Dict, Any, Tuple
import traceback
import base64
# # # solders/solana libs (solana_client) # # #
from spl.token._layouts import MINT_LAYOUT
from solana.rpc.api import Client, Pubkey
from spl.token.async_client import AsyncToken
from spl.token.constants import TOKEN_PROGRAM_ID
from borsh_construct import String, CStruct
# ------------------
logger = logging.getLogger(__name__)
PING_INTERVAL = 30
SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute
from config import (FOLLOW_AMOUNT, FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET, SOLANA_WS_URL)
from modules.utils import telegram_utils, async_safe_call, get_pk
# Use the production Solana RPC endpoint
solana_client = AsyncClient(SOLANA_HTTP_URL)
dexscreener_client = DexscreenerClient()
class SolanaWS:
def __init__(self, on_message: Optional[callable] = None):
self.websocket = None
self.subscription_id = None
self.message_queue = asyncio.Queue()
self.on_message = on_message
self.websocket = None
self.last_msg_responded = False
async def connect(self):
while True:
try:
current_url = random.choice(SOLANA_ENDPOINTS)
self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=10)
logger.info(f"Connected to Solana websocket: {current_url}")
return
except Exception as e:
logger.error(f"Failed to connect to {current_url}: {e}")
await asyncio.sleep(5)
async def ws_jsonrpc(self, method, params=None, doProcessResponse = True):
if not isinstance(params, list):
params = [params] if params is not None else []
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params
}
self.last_msg_responded = False
await self.websocket.send(json.dumps(request))
if not doProcessResponse:
return None
else:
response = await self.websocket.recv()
response_data = json.loads(response)
self.last_msg_responded = True
if 'result' in response_data:
return response_data['result']
elif 'error' in response_data:
logger.error(f"Error in WebSocket RPC call: {response_data['error']}")
return None
else:
logger.warning(f"Unexpected response: {response_data}")
return None
async def subscribe(self):
params = [
{"mentions": [FOLLOWED_WALLET]},
{"commitment": "confirmed"}
]
# define onmessage as inline callback to get subscription_id which waits for last_msg_responded
# self.on_message = lambda message: self.subscription_id = message.get('result')
await self.ws_jsonrpc("logsSubscribe", params, False)
await asyncio.sleep(.4)
await self.receive_messages(True)
result = await self.process_messages(True)
if result is not None and result > 0:
self.subscription_id = result
logger.info(f"Subscription successful. Subscription id: {self.subscription_id}")
elif result:
logger.error("already subscribed")
else:
logger.error("Failed to subscribe")
async def unsubscribe(self):
if self.subscription_id:
result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id])
if result:
logger.info(f"Unsubscribed from subscription id: {self.subscription_id}")
self.subscription_id = None
else:
logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}")
async def receive_messages(self, one = False):
while True:
try:
response = await self.websocket.recv()
response_data = json.loads(response)
self.last_msg_responded = True
if 'result' in response_data:
await self.message_queue.put(response_data['result'])
if one:
break
except websockets.exceptions.ConnectionClosedError:
logger.error("WebSocket connection closed")
break
except Exception as e:
logger.error(f"Error receiving message: {e}")
break
async def process_messages(self, one = False):
while True:
message = await self.message_queue.get()
if type(message) == int:
subscription_id = message
logger.info(f"Subscription id: {subscription_id}")
if self.on_message:
#message = json.loads(message)
await self.on_message(message)
logger.info(f"Received message: {message}")
if one:
return message
async def close(self):
if self.websocket:
await self.websocket.close()
logger.info("WebSocket connection closed")
async def solana_jsonrpc(self, method, params=None, jsonParsed=True):
if not isinstance(params, list):
params = [params] if params is not None else []
data = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params
}
if jsonParsed:
data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0})
else:
data["params"].append({"maxSupportedTransactionVersion": 0})
try:
response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
response.raise_for_status()
result = response.json()
if 'result' not in result or 'error' in result:
logger.error("Error fetching data from Solana RPC:", result)
return None
return result['result']
except Exception as e:
logger.error(f"Error fetching data from Solana RPC: {e}")
return None
class SolanaAPI:
pk = None
def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None):
self.process_transaction = process_transaction_callback
self.on_initial_subscription = on_initial_subscription_callback
# if callable(on_initial_subscription_callback) else lambda: None
# Define a default lambda function for on_bot_message
default_on_bot_message = lambda message: logger.info(f"Bot message: {message}")
# Use the provided on_bot_message if it's callable, otherwise use the default
self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message
self.dex = DEX
self.solana_ws = SolanaWS(on_message=self.process_transaction)
async def process_messages(self, solana_ws):
while True:
message = await solana_ws.message_queue.get()
await self.process_transaction(message)
_first_subscription = True
async def wallet_watch_loop(self):
solana_ws = SolanaWS(on_message=self.process_transaction)
first_subscription = True
while True:
try:
await solana_ws.connect()
await solana_ws.subscribe()
if first_subscription:
first_subscription = False
await async_safe_call(self.on_initial_subscription, solana_ws.subscription_id)
await async_safe_call(self.on_bot_message,f"Solana mainnet connected ({solana_ws.subscription_id})...")
receive_task = asyncio.create_task(solana_ws.receive_messages())
process_task = asyncio.create_task(solana_ws.process_messages())
try:
await asyncio.gather(receive_task, process_task)
except asyncio.CancelledError:
pass
finally:
receive_task.cancel()
process_task.cancel()
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
logger.error("".join(traceback.format_exception(None, e, e.__traceback__)))
finally:
try:
await solana_ws.unsubscribe()
if solana_ws.websocket:
await solana_ws.close()
await async_safe_call(self.on_bot_message,"Reconnecting...")
if receive_task and not receive_task.cancelled():
receive_task.cancel()
if process_task and not process_task.cancelled():
process_task.cancel()
except Exception as e:
logger.error(f"An error occurred while unsubscribing: {e}")
finally:
receive_task = None
process_task = None
await asyncio.sleep(5)
async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
last_check_time = None
last_signature = None
while True:
current_time = datetime.now()
if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval:
params = [
account_address,
{
"limit": limit
}
]
if last_signature:
params[1]["before"] = last_signature
result = await self.solana_ws.solana_jsonrpc("getSignaturesForAddress", params)
if result:
for signature in result:
if last_signature and signature['signature'] == last_signature:
break
await self.process_transaction(signature)
if result:
last_signature = result[0]['signature']
last_check_time = current_time
await asyncio.sleep(1)
async def get_token_metadata_symbol(self, mint_address):
if mint_address in DEX.TOKENS_INFO and 'symbol' in DEX.TOKENS_INFO[mint_address]:
return DEX.TOKENS_INFO[mint_address].get('symbol')
try:
account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address)
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_data = account_data_result['value']['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'decimals' in account_data_info:
if mint_address in DEX.TOKENS_INFO:
DEX.TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
else:
DEX.TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
if 'tokenName' in account_data_info:
if mint_address in DEX.TOKENS_INFO:
DEX.TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
else:
DEX.TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
metadata = await self.get_token_metadata(mint_address)
if metadata:
if mint_address in DEX.TOKENS_INFO:
DEX.TOKENS_INFO[mint_address].update(metadata)
else:
DEX.TOKENS_INFO[mint_address] = metadata
await DEX.save_token_info()
# DEX.TOKENS_INFO[mint_address] = metadata
# return metadata.get('symbol') or metadata.get('name')
return DEX.TOKENS_INFO[mint_address].get('symbol')
except Exception as e:
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
return None
async def get_token_metadata(self, mint_address):
try:
# Convert mint_address to PublicKey if it's a string
if isinstance(mint_address, str):
mint_pubkey = Pubkey.from_string(mint_address)
else:
mint_pubkey = mint_address
# Derive metadata account address
metadata_program_id = Pubkey.from_string("metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s")
metadata_account = Pubkey.find_program_address(
[b"metadata", bytes(metadata_program_id), bytes(mint_pubkey)],
metadata_program_id
)[0]
# Fetch metadata account info
metadata_account_info = await solana_client.get_account_info(metadata_account)
if metadata_account_info.value is not None:
data = metadata_account_info.value.data
# name = get_token_name_metadata(data).rstrip("\x00")
# Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint)
offset = 1 + 32 + 32
# Read the name length (u32)
name_length = struct.unpack("<I", data[offset:offset+4])[0]
offset += 4
# Read the name
try:
name = data[offset:offset+name_length].decode('utf-8').rstrip("\x00")
except Exception as e: name = None
offset += name_length
# Read the symbol length (u32)
symbol_length = struct.unpack("<I", data[offset:offset+4])[0]
offset += 4
# Read the symbol
try:
symbol = data[offset:offset+symbol_length].decode('utf-8').rstrip("\x00")
except Exception as e: symbol = None
# metadata = METADATA_STRUCT.parse(data)
return {"name": name, "symbol": symbol, "address": mint_address}
except Exception as e:
logging.error(f"Error fetching token metadata for {mint_address}: {str(e)}")
return None
async def get_transaction_details_rpc(self, tx_signature, readfromDump=False):
try:
if readfromDump and os.path.exists('./logs/transation_details.json'):
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
transaction_details = json.load(f)
return transaction_details
else:
transaction_details = await self.solana_ws.solana_jsonrpc("getTransaction", tx_signature)
with open('./logs/transation_details.json', 'w') as f:
json.dump(transaction_details, f, indent=2)
if transaction_details is None:
logging.error(f"Error fetching transaction details for {tx_signature}")
return None
# Initialize default result structure
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
# Extract order_id from logs
log_messages = transaction_details.get("meta", {}).get("logMessages", [])
for log in log_messages:
if "order_id" in log:
parsed_result["order_id"] = log.split(":")[2].strip()
break
# Extract token transfers from innerInstructions
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
info = instruction['parsed']['info']
mint = info['mint']
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
# Determine which token is being swapped in and out based on zero balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
# if we've failed to extract token_in and token_out from the transaction details, try a second method
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
transfers = []
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
info = instruction['parsed']['info']
amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
adjusted_amount = amount / (10 ** decimals)
# adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0))
transfers.append({
'mint': info.get('mint'),
'amount': adjusted_amount,
'source': info['source'],
'destination': info['destination']
})
# Identify token_in and token_out
if len(transfers) >= 2:
parsed_result["token_in"] = transfers[0]['mint']
parsed_result["amount_in"] = transfers[0]['amount']
parsed_result["token_out"] = transfers[-1]['mint']
parsed_result["amount_out"] = transfers[-1]['amount']
# If mint is not provided, query the Solana network for the account data
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
#for transfer in transfers:
# do only first and last transfer
for transfer in [transfers[0], transfers[-1]]:
if transfer['mint'] is None:
# Query the Solana network for the account data
account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", transfer['source'])
if 'value' in account_data_result and 'data' in account_data_result['value']:
account_data_value = account_data_result['value']
account_data_data = account_data_value['data']
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'mint' in account_data_info:
transfer['mint'] = account_data_info['mint']
if transfer['mint'] in DEX.TOKENS_INFO or 'decimals' not in DEX.TOKENS_INFO[transfer['mint']]:
await self.get_token_metadata_symbol(transfer['mint'])
# get actual prices
current_price = await DEX.get_token_prices([transfer['mint']])
if parsed_result["token_in"] is None:
parsed_result["token_in"] = transfer['mint']
parsed_result["symbol_in"] = DEX.TOKENS_INFO[transfer['mint']]['symbol']
parsed_result["amount_in"] = transfer['amount']/10**DEX.TOKENS_INFO[transfer['mint']]['decimals']
parsed_result["amount_in_USD"] = parsed_result["amount_in"] * DEX.TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = transfer['mint']
parsed_result["symbol_out"] = DEX.TOKENS_INFO[transfer['mint']]['symbol']
parsed_result["amount_out"] = transfer['amount']/10**DEX.TOKENS_INFO[transfer['mint']]['decimals']
parsed_result["amount_out_USD"] = parsed_result["amount_out"] * DEX.TOKENS_INFO[transfer['mint']]['price']
pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balalnces:
if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
break
# Calculate percentage swapped
try:
if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
else:
# calculate based on total wallet value: FOLLOWED_WALLET_VALUE
# division by 0
# parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / DEX.FOLLOWED_WALLET_VALUE) * 100
pass
except Exception as e:
logging.error(f"Error calculating percentage swapped: {e}")
return parsed_result
except requests.exceptions.RequestException as e:
print("Error fetching transaction details:", e)
# "Program log: Instruction: Swap2",
# "Program log: order_id: 13985890735038016",
# "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source
# "Program log: EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", target
# "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410",
# "Program log: after_source_balance: 0, after_destination_balance: 472509072",
# "Program log: source_token_change: 58730110139, destination_token_change: 270131294",
async def get_transaction_details_info(self, tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
tr_info = await self.get_transaction_details_with_retry(tx_signature_str)
try:
tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50
except Exception as e:
logging.error(f"Error calculating percentage swapped: {e}")
return tr_info
# def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
# pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', [])
# for balance in pre_balances:
# if balance['mint'] == token:
# return float(balance['uiTokenAmount']['amount'])
# return 0.0
async def get_transaction_details_with_retry(self, transaction_id, retry_delay = 5, max_retries = 16, backoff= True):
# wait for the transaction to be confirmed
# await async_client.wait_for_confirmation(Signature.from_string(transaction_id))
# query every 5 seconds for the transaction details until not None or 30 seconds
for _ in range(max_retries):
try:
tx_details = await self.get_transaction_details_rpc(transaction_id)
if tx_details is not None:
break
except Exception as e:
logging.error(f"Error fetching transaction details: {e}")
logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}. retry in {retry_delay} s.")
await asyncio.sleep(retry_delay)
if backoff:
retry_delay = retry_delay * 1.2
return tx_details
async def get_swap_transaction_details(self, tx_signature_str):
t = await self.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0)
try:
parsed_result = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
instructions = t.value.transaction.transaction.message.instructions
# Parse the swap instruction to extract token addresses, amounts, and types
for instruction in instructions:
if isinstance(instruction, CompiledInstruction):
if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"):
parsed_info = instruction.parsed.info
mint = parsed_info["mint"]
amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"])
# Determine token in and token out based on balances
if parsed_result["token_in"] is None and amount > 0:
parsed_result["token_in"] = mint
parsed_result["amount_in"] = amount
elif parsed_result["token_out"] is None:
parsed_result["token_out"] = mint
parsed_result["amount_out"] = amount
# Calculate percentage swapped
if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0:
parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100
return parsed_result
except Exception as e:
logging.error(f"Error fetching transaction details: {e}")
return None
async def get_token_balance_rpc(self, wallet_address, token_address):
try:
accounts = await self.solana_ws.solana_jsonrpc("getTokenAccountsByOwner", [
wallet_address,
{
"mint": token_address
}])
if accounts['value']:
first_account = accounts['value'][0]['pubkey']
balance_data = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTokenAccountBalance",
"params": [
first_account
]
}
balance = self.solana_ws.solana_jsonrpc("getTokenAccountBalance", first_account)
if 'value' in balance:
amount = float(balance['value']['uiAmount'])
logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}")
return amount
else:
logging.debug(f"No balance found for {token_address} in {wallet_address}")
return 0
else:
logging.debug(f"No account found for {token_address} in {wallet_address}")
return 0
except requests.exceptions.RequestException as e:
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}")
return 0
async def follow_move(self,move):
try:
your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
if your_balance_info is not None:
# Use the balance
print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
else:
print(f"No ballance found for {move['symbol_in']}. Skipping move.")
await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
return
your_balance = your_balance_info['amount']
token_info = DEX.TOKENS_INFO.get(move['token_in'])
token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata_symbol(move['token_in'])
token_name_out = DEX.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out'])
if not your_balance:
msg = f"<b>Move not followed:</b>\nNo balance found for token {move['symbol_in']}. Cannot follow move."
logging.warning(msg)
await telegram_utils.send_telegram_message(msg)
return
if FOLLOW_AMOUNT == 'percentage':
# Calculate the amount to swap based on the same percentage as the followed move
if move.get('percentage_swapped') is None:
followed_ballances = await DEX.get_wallet_balances(FOLLOWED_WALLET, doGetTokenName=False)
followed_ballance = next((balance for balance in followed_ballances.values() if balance['address'] == move['token_in']), None)
if followed_ballance is not None:
# Use the balance
print(f"Followed balance: {followed_ballance['amount']} {move['symbol_in']}")
move['percentage_swapped'] = (move['amount_in']/followed_ballance['amount']) * 100
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
else:
#fallback to 100 USD
amount_to_swap = 100
else:
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
elif FOLLOW_AMOUNT == 'exact':
amount_to_swap = move['amount_in']
else:
try:
fixed_amount = float(FOLLOW_AMOUNT) # un USD
fixed_amount_in_token = fixed_amount / move["token_in_price"]
amount_to_swap = min(fixed_amount_in_token, your_balance)
except ValueError:
msg = f"<b>Move not followed:</b>\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
logging.warning(msg)
await telegram_utils.send_telegram_message(msg)
return
amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
decimals = token_info.get('decimals')
# Convert to lamports
# if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9
amount_lamports = int(amount_to_swap * 10**decimals)
logging.debug(f"Calculated amount in lamports: {amount_lamports}")
if your_balance < amount_to_swap: # should not happen
msg = (
f"<b>Warning:</b>\n"
f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount_lamports}).\n This will probably fail. But we will try anyway."
)
logging.warning(msg)
await telegram_utils.send_telegram_message(msg)
try:
try:
notification = (
f"<b>Initiating move:</b>\n"
f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
+ (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
)
# logging.info(notification)
# error_logger.info(notification)
# await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
if self.pk is None:
self.pk = await get_pk()
for retry in range(3):
try:
private_key = Keypair.from_bytes(base58.b58decode(self.pk))
async_client = AsyncClient(SOLANA_WS_URL)
jupiter = Jupiter(async_client, private_key)
transaction_data = await jupiter.swap(
input_mint=move['token_in'],
output_mint=move['token_out'],
amount=amount_lamports,
slippage_bps=300, # Increased to 3%
)
logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
# error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}")
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
# message = raw_transaction.message
# signature = private_key.sign_message( bytes(message) )
signature = private_key.sign_message(message.to_bytes_versioned(raw_transaction.message))
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
# send the transaction
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
transaction_id = json.loads(result.to_json())['result']
notification = f"Follow Transaction Sent:\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>swapping {amount_to_swap:.2f} {token_name_in}</a>"
logging.info(notification)
await telegram_utils.send_telegram_message(notification)
tx_details = await SAPI.get_transaction_details_with_retry(transaction_id, retry_delay=5, max_retries=6, backoff=False)
if tx_details is not None:
break
else:
logging.warning(f"Failed to get transaction details for {transaction_id}.\n Probably transaction failed. Retrying again...")
await asyncio.sleep(3)
except Exception as e:
decoded_data = ''# base64.b64decode(transaction_data)
error_message = f"<b>Move Failed:</b>\n{str(e)}</b>\n{decoded_data}</b>\n{move}"
logging.error(error_message)
# log the errors to /logs/errors.log
# error_logger.error(error_message)
# error_logger.exception(e)
await telegram_utils.send_telegram_message(error_message)
amount = int(amount * 0.9)
await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
try:
if tx_details is None:
logging.info(f"Failed to get transaction details for {transaction_id}")
notification = (
f"<b>Move Followed, failed to get transaction details.</b>\n"
f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) "
f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>solscan.io</a>"
# log_successful_swap ()
)
else:
notification = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) "
f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
f"for {tx_details['amount_out']:.2f} {token_name_out}"
# f"Amount In USD: {tr_details['amount_in_USD']}\n"
f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>solscan.io</a>"
)
logging.info(notification)
await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
except Exception as e:
error_message = f"<b>Swap Follow Error:</b>\n{str(e)}"
logging.error(error_message)
# log the errors to /logs/errors.log
# error_logger.error(error_message)
# error_logger.exception(e)
# if error_message contains 'Program log: Error: insufficient funds'
if 'insufficient funds' in error_message:
await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
else:
await telegram_utils.send_telegram_message(error_message)
except Exception as e:
logging.error(f"Error following move: {e}")
class SolanaDEX:
def __init__(self, DISPLAY_CURRENCY: str):
self.DISPLAY_CURRENCY = DISPLAY_CURRENCY
self.solana_client = AsyncClient("https://api.mainnet-beta.solana.com")
self.TOKENS_INFO = {}
self.TOKEN_PRICES = {}
self.TOKEN_ADDRESSES = {}
self.FOLLOWED_WALLET_VALUE = 0
self.YOUR_WALLET_VALUE = 0
self.token_info_path = os.path.normpath(os.path.join(script_dir, '..', 'cache', 'token_info.json'))
try:
with open(self.token_info_path, 'r') as f:
self.TOKENS_INFO = json.load(f)
except Exception as e:
logging.error(f"Error loading token info: {str(e)}")
async def get_token_prices(self, token_addresses: List[str]) -> Dict[str, float]:
prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
remaining_tokens = [addr for addr in token_addresses if addr not in prices]
coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens)
prices.update(coingecko_prices)
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
jupiter_prices = await self.get_prices_from_jupiter(list(missing_tokens))
prices.update(jupiter_prices)
missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
if missing_tokens:
dexscreener_prices = await self.get_prices_from_dexscreener(list(missing_tokens))
prices.update(dexscreener_prices)
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
raydium_prices = await self.get_prices_from_raydium(list(missing_tokens))
prices.update(raydium_prices)
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
orca_prices = await self.get_prices_from_orca(list(missing_tokens))
prices.update(orca_prices)
for token in set(token_addresses) - set(prices.keys()):
prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.")
for token, price in prices.items():
token_info = self.TOKENS_INFO.setdefault(token, {})
if 'symbol' not in token_info:
token_info['symbol'] = await SAPI.get_token_metadata_symbol(token)
token_info['price'] = price
return prices
async def get_prices_from_coingecko(self, token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
prices = {}
async def fetch_single_price(session, address):
params = {
"contract_addresses": address,
"vs_currencies": self.DISPLAY_CURRENCY.lower()
}
try:
async with session.get(base_url, params=params) as response:
if response.status == 200:
data = await response.json()
if address in data and self.DISPLAY_CURRENCY.lower() in data[address]:
return address, data[address][self.DISPLAY_CURRENCY.lower()]
else:
logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}")
return address, None
async with aiohttp.ClientSession() as session:
tasks = [fetch_single_price(session, address) for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, price in results:
if price is not None:
prices[address] = price
return prices
async def get_prices_from_dexscreener(self, token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {}
try:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
except Exception as e:
logging.error(f"Error fetching token prices from DexScreener: {str(e)}")
return prices
async def get_prices_from_jupiter(self, token_addresses: List[str]) -> Dict[str, float]:
url = "https://price.jup.ag/v4/price"
params = {
"ids": ",".join(token_addresses)
}
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.get('data', {}).items():
if 'price' in price_info:
prices[address] = float(price_info['price'])
else:
logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
return prices
async def get_prices_from_raydium(self, token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.raydium.io/v2/main/price"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for address in token_addresses:
if address in data:
prices[address] = float(data[address])
else:
logging.error(f"Failed to get token prices from Raydium. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Raydium: {str(e)}")
return prices
async def get_prices_from_orca(self, token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.orca.so/allTokens"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for token_info in data:
if token_info['mint'] in token_addresses:
prices[token_info['mint']] = float(token_info['price'])
else:
logging.error(f"Failed to get token prices from Orca. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Orca: {str(e)}")
return prices
@staticmethod
async def fetch_token_data(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching data from {url}: {str(e)}")
return None
async def get_sol_price(self) -> float:
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
prices = await self.get_token_prices([sol_address])
return prices.get(sol_address, 0.0)
async def get_wallet_balances(self, wallet_address, doGetTokenName=True):
balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}")
try:
response = await self.solana_client.get_token_accounts_by_owner_json_parsed(
Pubkey.from_string(wallet_address),
opts=TokenAccountOpts(
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
),
commitment=Confirmed
)
if response.value:
for account in response.value:
try:
parsed_data = account.account.data.parsed
if isinstance(parsed_data, dict) and 'info' in parsed_data:
info = parsed_data['info']
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
mint = info['mint']
decimals = int(info['tokenAmount']['decimals'])
amount = int(info['tokenAmount']['amount'])
amount = float(amount /10**decimals)
if amount > 1:
if mint in self.TOKENS_INFO:
token_name = self.TOKENS_INFO[mint].get('symbol')
elif doGetTokenName:
token_name = await self.get_token_metadata_symbol(mint) or 'N/A'
self.TOKENS_INFO[mint] = {'symbol': token_name}
await asyncio.sleep(2)
self.TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals)
self.TOKENS_INFO[mint]['decimals'] = decimals
balances[mint] = {
'name': token_name or 'N/A',
'address': mint,
'amount': amount,
'decimals': decimals
}
try:
logging.debug(f"Account balance for {token_name} ({mint}): {amount}")
except Exception as e:
logging.error(f"Error logging account balance: {str(e)}")
else:
logging.warning(f"Unexpected data format for account: {account}")
except Exception as e:
logging.error(f"Error parsing account data: {str(e)}")
sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address))
if sol_balance.value is not None:
balances['SOL'] = {
'name': 'SOL',
'address': 'SOL',
'amount': sol_balance.value / 1e9
}
else:
logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
except Exception as e:
logging.error(f"Error getting wallet balances: {str(e)}")
logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}")
return balances
async def convert_balances_to_currency(self, balances, sol_price):
converted_balances = {}
for address, info in balances.items():
converted_balance = info.copy()
if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price
elif address in self.TOKEN_PRICES:
converted_balance['value'] = info['amount'] * self.TOKEN_PRICES[address]
else:
converted_balance['value'] = None
logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance
return converted_balances
async def list_initial_wallet_states(self, FOLLOWED_WALLET, YOUR_WALLET):
followed_wallet_balances = await self.get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await self.get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) |
set(your_wallet_balances.keys()) |
set(self.TOKEN_ADDRESSES.values()))
self.TOKEN_PRICES = await self.get_token_prices(all_token_addresses)
sol_price = await self.get_sol_price()
followed_converted_balances = await self.convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await self.convert_balances_to_currency(your_wallet_balances, sol_price)
self.TOKEN_ADDRESSES = {
address: info for address,
info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in self.TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
self.FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY} ({info['address']})")
self.FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = []
self.YOUR_WALLET_VALUE = 0
for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY}")
self.YOUR_WALLET_VALUE += info['value']
message = (
f"<b>Initial Wallet States (All balances in {self.DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {self.FOLLOWED_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {self.YOUR_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join([self.safe_get_property(info, 'name') for info in self.TOKEN_ADDRESSES.values()])}"
)
logging.info(message)
# await telegram_utils.send_telegram_message(message)
# save token info to file
await self.save_token_info()
async def save_token_info(self):
with open(self.token_info_path, 'w') as f:
json.dump(self.TOKENS_INFO, f, indent=2)
@staticmethod
def safe_get_property(obj, prop):
return obj.get(prop, 'N/A')
DEX = SolanaDEX(DISPLAY_CURRENCY)
SAPI = SolanaAPI( on_initial_subscription_callback=DEX.list_initial_wallet_states(FOLLOWED_WALLET,YOUR_WALLET))