diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 9eda412..8edec34 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -118,6 +118,8 @@ TOKEN_ADDRESSES = { "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", } + +# # # # # # # # # # TELEGRAM # # # # # # # # # # async def send_telegram_message(message): try: await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) @@ -127,6 +129,9 @@ async def send_telegram_message(message): logging.error(f"Error sending Telegram message: {str(e)}") + +# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # + async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: coingecko_prices = await get_prices_from_coingecko(token_addresses) @@ -208,6 +213,9 @@ async def get_sol_price_from_dexscreener() -> float: prices = await get_prices_from_dexscreener([sol_address]) return prices.get(sol_address, 0.0) + +# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # + async def get_token_balance_rpc(wallet_address, token_address): url = SOLANA_HTTP_URL headers = {"Content-Type": "application/json"} @@ -257,9 +265,9 @@ async def get_token_balance_rpc(wallet_address, token_address): except requests.exceptions.RequestException as e: logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 - - - + + +# # # solders/solana libs (solana_client) # # # async def get_token_name(mint_address): try: @@ -334,51 +342,6 @@ async def convert_balances_to_currency(balances , sol_price): converted_balances[address] = converted_balance return converted_balances - -async def list_initial_wallet_states(): - global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES - - followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) - your_wallet_balances = await get_wallet_balances(YOUR_WALLET) - - all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys())) - TOKEN_PRICES = await get_token_prices(all_token_addresses) - sol_price = await get_sol_price() - - followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) - your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) - - TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0} - logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") - - followed_wallet_state = [] - FOLLOWED_WALLET_VALUE = 0 - for address, info in followed_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - followed_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}") - FOLLOWED_WALLET_VALUE += info['value'] - - your_wallet_state = [] - YOUR_WALLET_VALUE = 0 - for address, info in your_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - your_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}") - YOUR_WALLET_VALUE += info['value'] - - message = ( - f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" - f"Followed Wallet ({FOLLOWED_WALLET}):\n" - f"{chr(10).join(followed_wallet_state)}\n" - f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Your Wallet ({YOUR_WALLET}):\n" - f"{chr(10).join(your_wallet_state)}\n" - f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Monitored Tokens:\n" - f"{', '.join([info['name'] for info in TOKEN_ADDRESSES.values()])}" - ) - - logging.info(message) - await send_telegram_message(message) async def get_swap_transaction_details(tx_signature_str): t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) @@ -393,14 +356,7 @@ async def get_swap_transaction_details(tx_signature_str): "amount_out_USD": 0, "percentage_swapped": 0 } - - # Extract log messages for order_id and swap details. we have that in the log hook - # log_messages = t.value.meta.log_messages - # for log in log_messages: - # if "order_id" in log: - # parsed_result["order_id"] = log.split(":")[1].strip() - # break - + instructions = t.value.transaction.transaction.message.instructions # Parse the swap instruction to extract token addresses, amounts, and types for instruction in instructions: @@ -435,22 +391,10 @@ async def get_swap_transaction_details(tx_signature_str): return None -async def get_transaction_details_with_retry(transaction_id, retry_delay = 11, max_retries = 11): - # wait for the transaction to be confirmed - # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) - # qwery every 5 seconds for the transaction details untill not None or 30 seconds - for _ in range(max_retries): - try: - tx_details = await get_transaction_details_rpc(transaction_id) - if tx_details is not None: - break - except Exception as e: - logging.error(f"Error fetching transaction details: {e}") - logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}") - await asyncio.sleep(retry_delay) - return tx_details - + +# # # RAW Solana API RPC # # # +#this is the meat of the application async def get_transaction_details_rpc(tx_signature, readfromDump=False): try: @@ -610,7 +554,72 @@ async def solana_jsonrpc(method, params = None, jsonParsed = True): logging.error(f"Error fetching data from Solana RPC: {e}") return None - + + +# # # # # # # # # # Functionality # # # # # # # # # # + + +async def list_initial_wallet_states(): + global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES + + followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) + your_wallet_balances = await get_wallet_balances(YOUR_WALLET) + + all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys())) + TOKEN_PRICES = await get_token_prices(all_token_addresses) + sol_price = await get_sol_price() + + followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) + your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) + + TOKEN_ADDRESSES = {address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0} + logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") + + followed_wallet_state = [] + FOLLOWED_WALLET_VALUE = 0 + for address, info in followed_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + followed_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}") + FOLLOWED_WALLET_VALUE += info['value'] + + your_wallet_state = [] + YOUR_WALLET_VALUE = 0 + for address, info in your_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + your_wallet_state.append(f"{info['name']} ({address}): {info['value']:.2f} {DISPLAY_CURRENCY}") + YOUR_WALLET_VALUE += info['value'] + + message = ( + f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" + f"Followed Wallet ({FOLLOWED_WALLET}):\n" + f"{chr(10).join(followed_wallet_state)}\n" + f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Your Wallet ({YOUR_WALLET}):\n" + f"{chr(10).join(your_wallet_state)}\n" + f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Monitored Tokens:\n" + f"{', '.join([info['name'] for info in TOKEN_ADDRESSES.values()])}" + ) + + logging.info(message) + await send_telegram_message(message) + +async def get_transaction_details_with_retry(transaction_id, retry_delay = 11, max_retries = 11): + # wait for the transaction to be confirmed + # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) + # qwery every 5 seconds for the transaction details untill not None or 30 seconds + for _ in range(max_retries): + try: + tx_details = await get_transaction_details_rpc(transaction_id) + if tx_details is not None: + break + except Exception as e: + logging.error(f"Error fetching transaction details: {e}") + logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}") + await asyncio.sleep(retry_delay) + return tx_details + + async def save_log(log): try: os.makedirs('./logs', exist_ok=True) @@ -855,9 +864,6 @@ async def follow_move(move): -async def on_logs(log): - logging.debug(f"Received log: {log}") - await process_log(log) async def subscribe_to_wallet(): @@ -907,7 +913,11 @@ async def subscribe_to_wallet(): await send_telegram_message("Connected to Solana network. Watching for transactions now.") elif 'params' in response_data: - await on_logs(response_data['params']['result']) + log = response_data['params']['result'] + logging.debug(f"Received transaction log: {log}") + # Create a new task for processing the log + asyncio.create_task(process_log(log)) + else: logger.warning(f"Unexpected response: {response}") diff --git a/crypto/sol/readme.md b/crypto/sol/readme.md index d4c989d..6afaabf 100644 --- a/crypto/sol/readme.md +++ b/crypto/sol/readme.md @@ -12,4 +12,24 @@ Save the code in a file, e.g., solana_agent.py. Run the Flask application: -`python app.py` \ No newline at end of file +`python app.py` + + +generate requirements:#!/bin/bash + +# Install necessary tools +pip install pipreqs pip-tools + +# Generate initial requirements +pipreqs . --force + +# Rename requirements.txt to requirements.in +mv requirements.txt requirements.in + +# Generate updated requirements.txt +pip-compile requirements.in + +# Optionally, update your environment +# pip-sync requirements.txt + +echo "Requirements have been updated in requirements.txt"