order and annotate

This commit is contained in:
Dobromir Popov 2024-10-07 23:25:23 +03:00
parent 69cd254a42
commit 3889ac44a3
2 changed files with 107 additions and 77 deletions

View File

@ -118,6 +118,8 @@ TOKEN_ADDRESSES = {
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og",
} }
# # # # # # # # # # TELEGRAM # # # # # # # # # #
async def send_telegram_message(message): async def send_telegram_message(message):
try: try:
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
@ -127,6 +129,9 @@ async def send_telegram_message(message):
logging.error(f"Error sending Telegram message: {str(e)}") logging.error(f"Error sending Telegram message: {str(e)}")
# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # #
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
coingecko_prices = await get_prices_from_coingecko(token_addresses) coingecko_prices = await get_prices_from_coingecko(token_addresses)
@ -208,6 +213,9 @@ async def get_sol_price_from_dexscreener() -> float:
prices = await get_prices_from_dexscreener([sol_address]) prices = await get_prices_from_dexscreener([sol_address])
return prices.get(sol_address, 0.0) return prices.get(sol_address, 0.0)
# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # #
async def get_token_balance_rpc(wallet_address, token_address): async def get_token_balance_rpc(wallet_address, token_address):
url = SOLANA_HTTP_URL url = SOLANA_HTTP_URL
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
@ -259,7 +267,7 @@ async def get_token_balance_rpc(wallet_address, token_address):
return 0 return 0
# # # solders/solana libs (solana_client) # # #
async def get_token_name(mint_address): async def get_token_name(mint_address):
try: try:
@ -335,51 +343,6 @@ async def convert_balances_to_currency(balances , sol_price):
return converted_balances return converted_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()))
TOKEN_PRICES = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += info['value']
message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join([info['name'] for info in TOKEN_ADDRESSES.values()])}"
)
logging.info(message)
await send_telegram_message(message)
async def get_swap_transaction_details(tx_signature_str): async def get_swap_transaction_details(tx_signature_str):
t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0)
try: try:
@ -394,13 +357,6 @@ async def get_swap_transaction_details(tx_signature_str):
"percentage_swapped": 0 "percentage_swapped": 0
} }
# Extract log messages for order_id and swap details. we have that in the log hook
# log_messages = t.value.meta.log_messages
# for log in log_messages:
# if "order_id" in log:
# parsed_result["order_id"] = log.split(":")[1].strip()
# break
instructions = t.value.transaction.transaction.message.instructions instructions = t.value.transaction.transaction.message.instructions
# Parse the swap instruction to extract token addresses, amounts, and types # Parse the swap instruction to extract token addresses, amounts, and types
for instruction in instructions: for instruction in instructions:
@ -435,22 +391,10 @@ async def get_swap_transaction_details(tx_signature_str):
return None return None
async def get_transaction_details_with_retry(transaction_id, retry_delay = 11, max_retries = 11):
# wait for the transaction to be confirmed
# await async_client.wait_for_confirmation(Signature.from_string(transaction_id))
# qwery every 5 seconds for the transaction details untill not None or 30 seconds
for _ in range(max_retries):
try:
tx_details = await get_transaction_details_rpc(transaction_id)
if tx_details is not None:
break
except Exception as e:
logging.error(f"Error fetching transaction details: {e}")
logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}")
await asyncio.sleep(retry_delay)
return tx_details
# # # RAW Solana API RPC # # #
#this is the meat of the application
async def get_transaction_details_rpc(tx_signature, readfromDump=False): async def get_transaction_details_rpc(tx_signature, readfromDump=False):
try: try:
@ -611,6 +555,71 @@ async def solana_jsonrpc(method, params = None, jsonParsed = True):
return None return None
# # # # # # # # # # Functionality # # # # # # # # # #
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()))
TOKEN_PRICES = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0}
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += info['value']
message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join([info['name'] for info in TOKEN_ADDRESSES.values()])}"
)
logging.info(message)
await send_telegram_message(message)
async def get_transaction_details_with_retry(transaction_id, retry_delay = 11, max_retries = 11):
# wait for the transaction to be confirmed
# await async_client.wait_for_confirmation(Signature.from_string(transaction_id))
# qwery every 5 seconds for the transaction details untill not None or 30 seconds
for _ in range(max_retries):
try:
tx_details = await get_transaction_details_rpc(transaction_id)
if tx_details is not None:
break
except Exception as e:
logging.error(f"Error fetching transaction details: {e}")
logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}")
await asyncio.sleep(retry_delay)
return tx_details
async def save_log(log): async def save_log(log):
try: try:
os.makedirs('./logs', exist_ok=True) os.makedirs('./logs', exist_ok=True)
@ -855,9 +864,6 @@ async def follow_move(move):
async def on_logs(log):
logging.debug(f"Received log: {log}")
await process_log(log)
async def subscribe_to_wallet(): async def subscribe_to_wallet():
@ -907,7 +913,11 @@ async def subscribe_to_wallet():
await send_telegram_message("Connected to Solana network. Watching for transactions now.") await send_telegram_message("Connected to Solana network. Watching for transactions now.")
elif 'params' in response_data: elif 'params' in response_data:
await on_logs(response_data['params']['result']) log = response_data['params']['result']
logging.debug(f"Received transaction log: {log}")
# Create a new task for processing the log
asyncio.create_task(process_log(log))
else: else:
logger.warning(f"Unexpected response: {response}") logger.warning(f"Unexpected response: {response}")

View File

@ -13,3 +13,23 @@ Save the code in a file, e.g., solana_agent.py.
Run the Flask application: Run the Flask application:
`python app.py` `python app.py`
generate requirements:#!/bin/bash
# Install necessary tools
pip install pipreqs pip-tools
# Generate initial requirements
pipreqs . --force
# Rename requirements.txt to requirements.in
mv requirements.txt requirements.in
# Generate updated requirements.txt
pip-compile requirements.in
# Optionally, update your environment
# pip-sync requirements.txt
echo "Requirements have been updated in requirements.txt"