This commit is contained in:
Dobromir Popov 2024-10-06 01:05:50 +03:00
parent c16129bcf0
commit 774bd333ef

View File

@ -30,6 +30,7 @@ from typing import List, Dict
import requests
import threading
import re
from typing import List, Dict, Any, Tuple
load_dotenv()
app = Flask(__name__)
@ -208,20 +209,6 @@ async def get_sol_price() -> float:
logging.error(f"Failed to get SOL price. Status: {response.status}")
return None
async def convert_balances_to_currency(balances, token_prices, sol_price):
converted_balances = {}
for address, info in balances.items():
converted_balance = info.copy() # Create a copy of the original info
if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price
elif address in token_prices:
converted_balance['value'] = info['amount'] * token_prices[address]
else:
converted_balance['value'] = None # Price not available
logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance
return converted_balances
async def get_token_balance_rpc(wallet_address, token_address):
url = SOLANA_HTTP_URL
@ -332,6 +319,20 @@ async def get_wallet_balances(wallet_address):
return balances
async def convert_balances_to_currency(balances, token_prices, sol_price):
converted_balances = {}
for address, info in balances.items():
converted_balance = info.copy() # Create a copy of the original info
if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price
elif address in token_prices:
converted_balance['value'] = info['amount'] * token_prices[address]
else:
converted_balance['value'] = None # Price not available
logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance
return converted_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE
@ -345,22 +346,22 @@ async def list_initial_wallet_states():
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price)
TOKEN_ADDRESSES = {token: balance['amount'] for token, balance in {**followed_converted_balances, **your_converted_balances}.items() if balance['amount'] is not None and balance['amount'] > 0}
logging.info(f"Monitoring balances for tokens: {[balance['name'] for balance in TOKEN_ADDRESSES.values()]}")
TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for token, balance in followed_converted_balances.items():
if balance['amount'] is not None and balance['amount'] > 0:
followed_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}")
FOLLOWED_WALLET_VALUE += balance['amount']
for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for token, balance in your_converted_balances.items():
if balance['amount'] is not None and balance['amount'] > 0:
your_wallet_state.append(f"{balance['name']} ({balance['address']}): {balance['amount']:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += balance['amount']
for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += info['value']
message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
@ -371,12 +372,13 @@ async def list_initial_wallet_states():
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join([balance['name'] for balance in TOKEN_ADDRESSES.values()])}"
f"{', '.join([info['name'] for info in TOKEN_ADDRESSES.values()])}"
)
logging.info(message)
await send_telegram_message(message)
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
url = SOLANA_HTTP_URL
# url = 'https://solana.drpc.org'
@ -407,7 +409,7 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False):
if 'result' in transaction_details:
print(transaction_details['result'])
#print(transaction_details['result'])
return transaction_details['result']
else:
print("Unexpected response:", transaction_details)
@ -432,7 +434,7 @@ async def process_log(log_result):
if log_result['value']['err']:
return
tx_signature_str = log_result['value']['signature']
logs = log_result['value']['logs']
try:
# Detect swap operations in logs
@ -441,7 +443,8 @@ async def process_log(log_result):
for log_entry in logs:
if any(op in log_entry for op in swap_operations):
try:
details = await parse_swap_logs(logs)
tx_signature_str = log_result['value']['signature']
details = await parse_swap_logs(tx_signature_str, logs)
message_text = (
f"Swap detected:\n"
@ -471,49 +474,25 @@ async def process_log(log_result):
# "Program log: before_source_balance: 58730110139, before_destination_balance: 202377778, amount_in: 58730110139, expect_amount_out: 270109505, min_return: 267408410",
# "Program log: after_source_balance: 0, after_destination_balance: 472509072",
# "Program log: source_token_change: 58730110139, destination_token_change: 270131294",
async def parse_swap_logs(logs):
async def parse_swap_logs(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
token_in = None
token_out = None
amount_in = 0
amount_out_expected = 0
amount_out_actual = 0
amount_out = 0
order_id = None
before_source_balance = 0
for log in logs:
if "Program log:" in log:
if "order_id:" in log:
order_id = log.split("order_id: ")[-1].strip()
elif "Swap2" in log:
token_in = None
token_out = None
elif not token_in:
token_in = log.split("Program log: ")[-1].strip()
elif not token_out:
token_out = log.split("Program log: ")[-1].strip()
if "before_source_balance:" in log:
before_source_balance = int(re.search(r"before_source_balance: (\d+)", log).group(1))
if "amount_in" in log or "amount_out" in log:
amount_matches = re.findall(r"(amount_in|amount_out): (\d+)", log)
for amount_type, value in amount_matches:
if amount_type == "amount_in":
amount_in = int(value)
elif amount_type == "amount_out":
amount_out_expected = int(value)
elif "source_token_change:" in log or "destination_token_change:" in log:
changes = log.split(", ")
for change in changes:
if "source_token_change" in change:
amount_in = int(change.split(": ")[-1])
elif "destination_token_change" in change:
amount_out_actual = int(change.split(": ")[-1])
transaction_details = await get_transaction_details_rpc(tx_signature_str)
token_in, token_out, amount_in, amount_out = _extract_token_info(transaction_details)
# Fetch token prices
token_prices = await get_token_prices([token_in, token_out])
amount_in_usd = amount_in / 1e6 * token_prices.get(token_in, 0)
amount_out_usd = amount_out_actual / 1e6 * token_prices.get(token_out, 0)
# Calculate USD values
amount_in_usd = amount_in * token_prices.get(token_in, 0)
amount_out_usd = amount_out * token_prices.get(token_out, 0)
# Get the pre-balance of the input token
before_source_balance = _get_pre_balance(transaction_details, token_in)
# Calculate the percentage of the source balance that was swapped
percentage_swapped = (amount_in / before_source_balance) * 100 if before_source_balance > 0 else 0
@ -522,14 +501,48 @@ async def parse_swap_logs(logs):
"order_id": order_id,
"token_in": token_in,
"token_out": token_out,
"amount_in": amount_in / 1e6,
"amount_out_expected": amount_out_expected / 1e6,
"amount_out_actual": amount_out_actual / 1e6,
"amount_in": amount_in,
"amount_out": amount_out,
"amount_in_USD": amount_in_usd,
"amount_out_USD": amount_out_usd,
"percentage_swapped": percentage_swapped
}
def _extract_token_info(transaction_details: Dict[str, Any]) -> Tuple[str, str, float, float]:
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
token_in = None
token_out = None
amount_in = 0
amount_out = 0
for instruction_set in inner_instructions:
for instruction in instruction_set.get('instructions', []):
if 'parsed' in instruction and 'info' in instruction['parsed']:
info = instruction['parsed']['info']
if info.get('type') == 'transfer':
if token_in is None:
token_in = info.get('source')
amount_in = float(info.get('amount', 0))
else:
token_out = info.get('destination')
amount_out = float(info.get('amount', 0))
return token_in, token_out, amount_in, amount_out
def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balances:
if balance['mint'] == token:
return float(balance['uiTokenAmount']['amount'])
return 0.0
def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', [])
for balance in pre_balances:
if balance['mint'] == token:
return float(balance['uiTokenAmount']['amount'])
return 0.0
async def follow_move(move):
your_balances = await get_wallet_balances(YOUR_WALLET)
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
@ -609,6 +622,9 @@ async def subscribe_to_wallet():
reconnect_delay = 5 # Start with a 5-second delay
max_reconnect_delay = 60 # Maximum delay of 60 seconds
await list_initial_wallet_states()
while True:
try:
async with websockets.connect(uri) as websocket:
@ -644,7 +660,6 @@ async def subscribe_to_wallet():
await save_subscription_id(subscription_id)
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
await list_initial_wallet_states()
elif 'params' in response_data:
await on_logs(response_data['params']['result'])
@ -679,7 +694,7 @@ async def main():
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await subscribe_to_wallet()
#await subscribe_to_wallet()
def run_flask():
# Run Flask app without the reloader, so we can run the async main function