diff --git a/crypto/sol/app.py b/crypto/sol/app.py
index 6c68b93..56821b1 100644
--- a/crypto/sol/app.py
+++ b/crypto/sol/app.py
@@ -5,7 +5,6 @@ from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.transaction import Signature
from solana.rpc.websocket_api import connect
-from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.commitment import Confirmed, Processed
from solana.transaction import Transaction
from spl.token.client import Token
@@ -24,6 +23,7 @@ from solders.instruction import CompiledInstruction
from solders import message
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
from dexscreener import DexscreenerClient
+from solana.rpc.types import TokenAccountOpts, TxOpts
import datetime
import logging
@@ -55,38 +55,9 @@ from config import (
error_logger
)
-from modules.utils import (get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details)
-from modules.SolanaAPI import SolanaAPI, solana_jsonrpc, wallet_watch_loop
-from modules.utils import telegram_utils, send_telegram_message
-
-# # config = load_config()
-# load_dotenv()
-# load_dotenv('.env.secret')
-# # Configuration
-# DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
-# FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
-# YOUR_WALLET = os.getenv("YOUR_WALLET")
-# TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
-# SOLANA_WS_URL = os.getenv("SOLANA_WS_URL")
-# SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL")
-# DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
-# BOT_NAME = os.getenv("BOT_NAME")
-
-# logger = logging.getLogger(__name__)
-# logging.basicConfig(level=logging.DEBUG)
-# #logging.basicConfig(level=logging.INFO)
-
-# # Set up error logger
-# log_dir = './logs'
-# log_file = os.path.join(log_dir, 'error.log')
-# os.makedirs(log_dir, exist_ok=True)
-# error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5)
-# error_file_handler.setLevel(logging.ERROR)
-# error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') )
-# error_logger = logging.getLogger('error_logger')
-# error_logger.setLevel(logging.ERROR)
-# error_logger.addHandler(error_file_handler)
+from modules.SolanaAPI import SolanaAPI, SolanaDEX
+from modules.utils import telegram_utils
# Function to find the latest log file
@@ -145,7 +116,7 @@ if not telegram_utils.bot:
asyncio.run(telegram_utils.initialize())
except Exception as e:
logging.error(f"Error initializing Telegram bot: {str(e)}")
-# async def send_telegram_message(message):
+# async def telegram_utils.send_telegram_message(message):
# try:
# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
# logging.info(f"Telegram message sent: {message}")
@@ -161,196 +132,6 @@ if not telegram_utils.bot:
# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # #
-async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
- global TOKENS_INFO
-
- # Skip for USD
- prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
- remaining_tokens = [addr for addr in token_addresses if addr not in prices]
-
- # Try CoinGecko
- coingecko_prices = await get_prices_from_coingecko(remaining_tokens)
- prices.update(coingecko_prices)
-
-
- # For remaining missing tokens, try Jupiter
- missing_tokens = set(remaining_tokens) - set(prices.keys())
- if missing_tokens:
- jupiter_prices = await get_prices_from_jupiter(list(missing_tokens))
- prices.update(jupiter_prices)
-
-
- # For tokens not found in CoinGecko, use DexScreener
- missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
- if missing_tokens:
- dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
- prices.update(dexscreener_prices)
-
- # For remaining missing tokens, try Raydium
- missing_tokens = set(remaining_tokens) - set(prices.keys())
- if missing_tokens:
- raydium_prices = await get_prices_from_raydium(list(missing_tokens))
- prices.update(raydium_prices)
-
- # For remaining missing tokens, try Orca
- missing_tokens = set(remaining_tokens) - set(prices.keys())
- if missing_tokens:
- orca_prices = await get_prices_from_orca(list(missing_tokens))
- prices.update(orca_prices)
-
- # If any tokens are still missing, set their prices to 0
- for token in set(token_addresses) - set(prices.keys()):
- prices[token] = 0.0
- logging.warning(f"Price not found for token {token}. Setting to 0.")
-
- for token, price in prices.items():
- token_info = TOKENS_INFO.setdefault(token, {})
- if 'symbol' not in token_info:
- token_info['symbol'] = await get_token_metadata_symbol(token)
- token_info['price'] = price
-
- return prices
-
-
-async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
- base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
- prices = {}
-
- async def fetch_single_price(session, address):
- params = {
- "contract_addresses": address,
- "vs_currencies": DISPLAY_CURRENCY.lower()
- }
- try:
- async with session.get(base_url, params=params) as response:
- if response.status == 200:
- data = await response.json()
- if address in data and DISPLAY_CURRENCY.lower() in data[address]:
- return address, data[address][DISPLAY_CURRENCY.lower()]
- else:
- logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
- except Exception as e:
- logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}")
- return address, None
-
- async with aiohttp.ClientSession() as session:
- tasks = [fetch_single_price(session, address) for address in token_addresses]
- results = await asyncio.gather(*tasks)
-
- for address, price in results:
- if price is not None:
- prices[address] = price
-
- return prices
-
-async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
- base_url = "https://api.dexscreener.com/latest/dex/tokens/"
- prices = {}
-
- try:
- async with aiohttp.ClientSession() as session:
- tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
- results = await asyncio.gather(*tasks)
-
- for address, result in zip(token_addresses, results):
- if result and 'pairs' in result and result['pairs']:
- pair = result['pairs'][0] # Use the first pair (usually the most liquid)
- prices[address] = float(pair['priceUsd'])
- else:
- logging.warning(f"No price data found on DexScreener for token {address}")
- except Exception as e:
- logging.error(f"Error fetching token prices from DexScreener: {str(e)}")
-
- return prices
-
-async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]:
- url = "https://price.jup.ag/v4/price"
- params = {
- "ids": ",".join(token_addresses)
- }
- prices = {}
-
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url, params=params) as response:
- if response.status == 200:
- data = await response.json()
- for address, price_info in data.get('data', {}).items():
- if 'price' in price_info:
- prices[address] = float(price_info['price'])
- else:
- logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}")
- except Exception as e:
- logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
- return prices
-
-# New function for Raydium
-async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]:
- url = "https://api.raydium.io/v2/main/price"
- prices = {}
-
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- if response.status == 200:
- data = await response.json()
- for address in token_addresses:
- if address in data:
- prices[address] = float(data[address])
- else:
- logging.error(f"Failed to get token prices from Raydium. Status: {response.status}")
- except Exception as e:
- logging.error(f"Error fetching token prices from Raydium: {str(e)}")
- return prices
-
-# New function for Orca
-async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]:
- url = "https://api.orca.so/allTokens"
- prices = {}
-
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- if response.status == 200:
- data = await response.json()
- for token_info in data:
- if token_info['mint'] in token_addresses:
- prices[token_info['mint']] = float(token_info['price'])
- else:
- logging.error(f"Failed to get token prices from Orca. Status: {response.status}")
- except Exception as e:
- logging.error(f"Error fetching token prices from Orca: {str(e)}")
- return prices
-
-
-async def fetch_token_data(session, url):
- try:
- async with session.get(url) as response:
- if response.status == 200:
- return await response.json()
- else:
- logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
- return None
- except Exception as e:
- logging.error(f"Error fetching data from {url}: {str(e)}")
- return None
-
-async def get_sol_price() -> float:
- url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as response:
- if response.status == 200:
- data = await response.json()
- return data['solana'][DISPLAY_CURRENCY.lower()]
- else:
- logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
- return await get_sol_price_from_dexscreener()
-
-async def get_sol_price_from_dexscreener() -> float:
- sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
- prices = await get_prices_from_dexscreener([sol_address])
- return prices.get(sol_address, 0.0)
-
# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # #
@@ -415,43 +196,6 @@ from spl.token.async_client import AsyncToken
from spl.token.constants import TOKEN_PROGRAM_ID
from borsh_construct import String, CStruct
-async def get_token_metadata_symbol(mint_address):
- global TOKENS_INFO
-
- if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]:
- return TOKENS_INFO[mint_address].get('symbol')
-
- try:
- account_data_result = await solana_jsonrpc("getAccountInfo", mint_address)
- if 'value' in account_data_result and 'data' in account_data_result['value']:
- account_data_data = account_data_result['value']['data']
- if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
- account_data_info = account_data_data['parsed']['info']
- if 'decimals' in account_data_info:
- if mint_address in TOKENS_INFO:
- TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
- else:
- TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
- if 'tokenName' in account_data_info:
- if mint_address in TOKENS_INFO:
- TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
- else:
- TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
-
- metadata = await get_token_metadata(mint_address)
- if metadata:
- if mint_address in TOKENS_INFO:
- TOKENS_INFO[mint_address].update(metadata)
- else:
- TOKENS_INFO[mint_address] = metadata
- await save_token_info()
- # TOKENS_INFO[mint_address] = metadata
- # return metadata.get('symbol') or metadata.get('name')
- return TOKENS_INFO[mint_address].get('symbol')
- except Exception as e:
- logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
- return None
-
@@ -557,80 +301,6 @@ async def get_token_metadata(mint_address):
return None
-async def get_wallet_balances(wallet_address, doGetTokenName=True):
- balances = {}
- logging.info(f"Getting balances for wallet: {wallet_address}")
- global TOKENS_INFO
- try:
- response = await solana_client.get_token_accounts_by_owner_json_parsed(
- Pubkey.from_string(wallet_address),
- opts=TokenAccountOpts(
- program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
- ),
- commitment=Confirmed
- )
-
- if response.value:
- for account in response.value:
- try:
- parsed_data = account.account.data.parsed
- if isinstance(parsed_data, dict) and 'info' in parsed_data:
- info = parsed_data['info']
- if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
- mint = info['mint']
- decimals = info['tokenAmount']['decimals']
- amount = float(info['tokenAmount']['amount'])/10**decimals
- if amount > 0:
- if mint in TOKENS_INFO:
- token_name = TOKENS_INFO[mint].get('symbol')
- elif doGetTokenName:
- token_name = await get_token_metadata_symbol(mint) or 'N/A'
- # sleep for 1 second to avoid rate limiting
- await asyncio.sleep(2)
-
- TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals)
- TOKENS_INFO[mint]['decimals'] = decimals
- balances[mint] = {
- 'name': token_name or 'N/A',
- 'address': mint,
- 'amount': amount,
- 'decimals': decimals
- }
- # sleep for 1 second to avoid rate limiting
- logging.debug(f"Account balance for {token_name} ({mint}): {amount}")
- else:
- logging.warning(f"Unexpected data format for account: {account}")
- except Exception as e:
- logging.error(f"Error parsing account data: {str(e)}")
-
- sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
- if sol_balance.value is not None:
- balances['SOL'] = {
- 'name': 'SOL',
- 'address': 'SOL',
- 'amount': sol_balance.value / 1e9
- }
- else:
- logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
-
- except Exception as e:
- logging.error(f"Error getting wallet balances: {str(e)}")
- logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}")
- return balances
-
-async def convert_balances_to_currency(balances , sol_price):
- converted_balances = {}
- for address, info in balances.items():
- converted_balance = info.copy() # Create a copy of the original info
- if info['name'] == 'SOL':
- converted_balance['value'] = info['amount'] * sol_price
- elif address in TOKEN_PRICES:
- converted_balance['value'] = info['amount'] * TOKEN_PRICES[address]
- else:
- converted_balance['value'] = None # Price not available
- logging.warning(f"Price not available for token {info['name']} ({address})")
- converted_balances[address] = converted_balance
- return converted_balances
async def get_swap_transaction_details(tx_signature_str):
@@ -675,201 +345,10 @@ async def get_swap_transaction_details(tx_signature_str):
return None
-
-# # # RAW Solana API RPC # # #
-
-#this is the meat of the application
-async def get_transaction_details_rpc(tx_signature, readfromDump=False):
- global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO
- try:
- if readfromDump and os.path.exists('./logs/transation_details.json'):
- with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
- transaction_details = json.load(f)
- return transaction_details
- else:
- transaction_details = await solana_jsonrpc("getTransaction", tx_signature)
- with open('./logs/transation_details.json', 'w') as f:
- json.dump(transaction_details, f, indent=2)
-
- if transaction_details is None:
- logging.error(f"Error fetching transaction details for {tx_signature}")
- return None
-
- # Initialize default result structure
- parsed_result = {
- "order_id": None,
- "token_in": None,
- "token_out": None,
- "amount_in": 0,
- "amount_out": 0,
- "amount_in_USD": 0,
- "amount_out_USD": 0,
- "percentage_swapped": 0
- }
-
- # Extract order_id from logs
- log_messages = transaction_details.get("meta", {}).get("logMessages", [])
- for log in log_messages:
- if "order_id" in log:
- parsed_result["order_id"] = log.split(":")[2].strip()
- break
-
- # Extract token transfers from innerInstructions
- inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
- for instruction_set in inner_instructions:
- for instruction in instruction_set.get('instructions', []):
- if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
- info = instruction['parsed']['info']
- mint = info['mint']
- amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
-
- # Determine which token is being swapped in and out based on zero balances
- if parsed_result["token_in"] is None and amount > 0:
- parsed_result["token_in"] = mint
- parsed_result["amount_in"] = amount
-
-
- if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
- # if we've failed to extract token_in and token_out from the transaction details, try a second method
- inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
- transfers = []
-
- for instruction_set in inner_instructions:
- for instruction in instruction_set.get('instructions', []):
- if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
- info = instruction['parsed']['info']
- amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
- decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
- adjusted_amount = amount / (10 ** decimals)
- # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0))
- transfers.append({
- 'mint': info.get('mint'),
- 'amount': adjusted_amount,
- 'source': info['source'],
- 'destination': info['destination']
- })
-
- # Identify token_in and token_out
- if len(transfers) >= 2:
- parsed_result["token_in"] = transfers[0]['mint']
- parsed_result["amount_in"] = transfers[0]['amount']
- parsed_result["token_out"] = transfers[-1]['mint']
- parsed_result["amount_out"] = transfers[-1]['amount']
-
- # If mint is not provided, query the Solana network for the account data
- if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
- #for transfer in transfers:
- # do only first and last transfer
- for transfer in [transfers[0], transfers[-1]]:
- if transfer['mint'] is None:
- # Query the Solana network for the account data
- account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source'])
-
- if 'value' in account_data_result and 'data' in account_data_result['value']:
- account_data_value = account_data_result['value']
- account_data_data = account_data_value['data']
- if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
- account_data_info = account_data_data['parsed']['info']
- if 'mint' in account_data_info:
- transfer['mint'] = account_data_info['mint']
- if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
- await get_token_metadata_symbol(transfer['mint'])
- # get actual prices
- current_price = await get_token_prices([transfer['mint']])
-
- if parsed_result["token_in"] is None:
- parsed_result["token_in"] = transfer['mint']
- parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol']
- parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
- parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
- elif parsed_result["token_out"] is None:
- parsed_result["token_out"] = transfer['mint']
- parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol']
- parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
- parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price']
-
- pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
- for balance in pre_balalnces:
- if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
- parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
- break
-
-
- # Calculate percentage swapped
- try:
- if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0:
- parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
- else:
- # calculate based on total wallet value: FOLLOWED_WALLET_VALUE
- parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100
- except Exception as e:
- logging.error(f"Error calculating percentage swapped: {e}")
-
- return parsed_result
-
- except requests.exceptions.RequestException as e:
- print("Error fetching transaction details:", e)
-
-
# # # # # # # # # # Functionality # # # # # # # # # #
-async def list_initial_wallet_states():
- global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
- global TOKENS_INFO # new
-
- followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
- your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
-
- all_token_addresses = list(set(followed_wallet_balances.keys()) |
- set(your_wallet_balances.keys()) |
- set(TOKEN_ADDRESSES.values()))
-
- TOKEN_PRICES = await get_token_prices(all_token_addresses)
- sol_price = await get_sol_price()
-
- followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
- your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
-
-
- TOKEN_ADDRESSES = {
- address: info for address,
- info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
- }
- logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
-
- followed_wallet_state = []
- FOLLOWED_WALLET_VALUE = 0
- for address, info in followed_converted_balances.items():
- if info['value'] is not None and info['value'] > 0:
- followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})")
- FOLLOWED_WALLET_VALUE += info['value']
-
- your_wallet_state = []
- YOUR_WALLET_VALUE = 0
- for address, info in your_converted_balances.items():
- if info['value'] is not None and info['value'] > 0:
- your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}")
- YOUR_WALLET_VALUE += info['value']
-
- message = (
- f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n"
- f"Followed Wallet ({FOLLOWED_WALLET}):\n"
- f"{chr(10).join(followed_wallet_state)}\n"
- f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
- f"Your Wallet ({YOUR_WALLET}):\n"
- f"{chr(10).join(your_wallet_state)}\n"
- f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
- f"Monitored Tokens:\n"
- f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}"
- )
-
- logging.info(message)
- await send_telegram_message(message)
-
- # save token info to file
- await save_token_info()
def safe_get_property(info, property_name, default='Unknown'):
if not isinstance(info, dict):
@@ -887,7 +366,7 @@ async def get_transaction_details_with_retry(transaction_id, retry_delay = 8, ma
# qwery every 5 seconds for the transaction details untill not None or 30 seconds
for _ in range(max_retries):
try:
- tx_details = await get_transaction_details_rpc(transaction_id)
+ tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id)
if tx_details is not None:
break
except Exception as e:
@@ -1015,13 +494,13 @@ async def process_log(log_result):
f"Amount In USD: {tr_details['amount_in_USD']:.2f}\n"
f"Percentage Swapped: {tr_details['percentage_swapped']:.2f}%"
)
- await send_telegram_message(message_text)
+ await telegram_utils.send_telegram_message(message_text)
await follow_move(tr_details)
await save_token_info()
except Exception as e:
logging.error(f"Error aquiring log details and following: {e}")
- await send_telegram_message(f"Not followed! Error following move.")
+ await telegram_utils.send_telegram_message(f"Not followed! Error following move.")
@@ -1072,14 +551,14 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
async def follow_move(move):
- your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+ your_balances = await solanaAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
if your_balance_info is not None:
# Use the balance
print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
else:
print("No ballance found for {move['symbol_in']}. Skipping move.")
- await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
+ await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
return
your_balance = your_balance_info['amount']
@@ -1087,12 +566,12 @@ async def follow_move(move):
token_info = TOKENS_INFO.get(move['token_in'])
token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in'])
- token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await get_token_metadata_symbol(move['token_out'])
+ token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out'])
if not your_balance:
msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move."
logging.warning(msg)
- await send_telegram_message(msg)
+ await telegram_utils.send_telegram_message(msg)
return
# move["percentage_swapped"] = (move["amount_out"] / move["amount_in"]) * 100
@@ -1119,7 +598,7 @@ async def follow_move(move):
f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway."
)
logging.warning(msg)
- await send_telegram_message(msg)
+ await telegram_utils.send_telegram_message(msg)
try:
try:
@@ -1129,7 +608,7 @@ async def follow_move(move):
)
# logging.info(notification)
# error_logger.info(notification)
- # await send_telegram_message(notification)
+ # await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
@@ -1159,7 +638,7 @@ async def follow_move(move):
# append to notification
notification += f"\n\nTransaction: {transaction_id}"
- await send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
+ await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
tx_details = await get_transaction_details_with_retry(transaction_id)
if tx_details is not None:
@@ -1173,7 +652,7 @@ async def follow_move(move):
# log the errors to /logs/errors.log
error_logger.error(error_message)
error_logger.exception(e)
- await send_telegram_message(error_message)
+ await telegram_utils.send_telegram_message(error_message)
amount = amount * 0.75
await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
@@ -1198,7 +677,7 @@ async def follow_move(move):
f"\n\nTransaction: {transaction_id}"
)
logging.info(notification)
- await send_telegram_message(notification)
+ await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
@@ -1210,9 +689,9 @@ async def follow_move(move):
error_logger.exception(e) \
# if error_message contains 'Program log: Error: insufficient funds'
if 'insufficient funds' in error_message:
- await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
+ await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
else:
- await send_telegram_message(error_message)
+ await telegram_utils.send_telegram_message(error_message)
# Helper functions
@@ -1236,41 +715,45 @@ async def check_PK():
if not pk:
logging.error("Private key not found in environment variables. Will not be able to sign transactions.")
# send TG warning message
- await send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.")
+ await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.")
+solanaAPI = SolanaAPI(process_transaction_callback=process_log)
+
async def main():
- global bot, PROCESSING_LOG
+ global solanaAPI, bot, PROCESSING_LOG
- await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
+ await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await check_PK()
# new: restart wallet_watch_loop every hour
- while True:
- wallet_watch_task = asyncio.create_task(wallet_watch_loop())
+ await solanaAPI.wallet_watch_loop()
+
+ # while True:
+ # wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop())
- try:
- # Wait for an hour or until the task completes, whichever comes first
- await asyncio.wait_for(wallet_watch_task, timeout=3600)
- except asyncio.TimeoutError:
- # If an hour has passed, cancel the task if not PROCESSING
- if PROCESSING_LOG:
- logging.info("wallet_watch_loop is processing logs. Will not restart.")
- await send_telegram_message("wallet_watch_loop is processing logs. Will not restart.")
- else:
- wallet_watch_task.cancel()
- try:
- await wallet_watch_task
- except asyncio.CancelledError:
- logging.info("wallet_watch_loop was cancelled after running for an hour")
- except Exception as e:
- logging.error(f"Error in wallet_watch_loop: {str(e)}")
- await send_telegram_message(f"Error in wallet_watch_loop: {str(e)}")
+ # try:
+ # # Wait for an hour or until the task completes, whichever comes first
+ # await asyncio.wait_for(wallet_watch_task, timeout=3600)
+ # except asyncio.TimeoutError:
+ # # If an hour has passed, cancel the task if not PROCESSING
+ # if PROCESSING_LOG:
+ # logging.info("wallet_watch_loop is processing logs. Will not restart.")
+ # await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.")
+ # else:
+ # wallet_watch_task.cancel()
+ # try:
+ # await wallet_watch_task
+ # except asyncio.CancelledError:
+ # logging.info("wallet_watch_loop was cancelled after running for an hour")
+ # except Exception as e:
+ # logging.error(f"Error in wallet_watch_loop: {str(e)}")
+ # await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}")
- logging.info("Restarting wallet_watch_loop")
- await send_telegram_message("Restarting wallet_watch_loop")
+ # logging.info("Restarting wallet_watch_loop")
+ # await telegram_utils.send_telegram_message("Restarting wallet_watch_loop")
diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py
index bc19389..e0fa5cb 100644
--- a/crypto/sol/modules/SolanaAPI.py
+++ b/crypto/sol/modules/SolanaAPI.py
@@ -1,5 +1,7 @@
import sys
import os
+
+import aiohttp
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
@@ -7,23 +9,21 @@ import json
import logging
import random
import websockets
-from typing import Optional
+from typing import Dict, List, Optional
import requests
-import datetime
+from datetime import datetime
+from solana.rpc.types import TokenAccountOpts, TxOpts
logger = logging.getLogger(__name__)
SOLANA_ENDPOINTS = [
"wss://api.mainnet-beta.solana.com",
- # "wss://solana-api.projectserum.com",
- # "wss://rpc.ankr.com/solana",
- # "wss://mainnet.rpcpool.com",
]
PING_INTERVAL = 30
-SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
+SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 1 minute
from config import (
-FOLLOWED_WALLET, SOLANA_HTTP_URL
+ FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY
)
from modules.utils import telegram_utils
@@ -106,85 +106,60 @@ class SolanaWS:
async def process_messages(self):
while True:
message = await self.message_queue.get()
- await self.on_message(message)
+ if self.on_message:
+ await self.on_message(message)
logger.info(f"Received message: {message}")
async def close(self):
if self.websocket:
await self.websocket.close()
logger.info("WebSocket connection closed")
- while True:
- message = await self.message_queue.get()
- try:
- response_data = json.loads(message)
- if 'params' in response_data:
- log = response_data['params']['result']
- await process_log(log)
- else:
- logger.warning(f"Unexpected response: {response_data}")
- except json.JSONDecodeError as e:
- logger.error(f"Failed to decode JSON: {e}")
- except Exception as e:
- logger.error(f"An unexpected error occurred while processing message: {e}")
- finally:
- self.message_queue.task_done()
+ async def solana_jsonrpc(method, params=None, jsonParsed=True):
+ if not isinstance(params, list):
+ params = [params] if params is not None else []
-async def solana_jsonrpc(method, params = None, jsonParsed = True):
- # target json example:
- # data = {
- # "jsonrpc": "2.0",
- # "id": 1,
- # "method": "getTransaction",
- # "params": [
- # tx_signature,
- # {
- # "encoding": "jsonParsed",
- # "maxSupportedTransactionVersion": 0
- # }
- # ]
- # }
- # if param is not array, make it array
- if not isinstance(params, list):
- params = [params]
+ data = {
+ "jsonrpc": "2.0",
+ "id": 1,
+ "method": method,
+ "params": params
+ }
+
+ if jsonParsed:
+ data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0})
+ else:
+ data["params"].append({"maxSupportedTransactionVersion": 0})
- data = {
- "jsonrpc": "2.0",
- "id": 1,
- "method": method,
- "params": params or []
- }
- data["params"].append({"maxSupportedTransactionVersion": 0})
- if jsonParsed:
- data["params"][1]["encoding"] = "jsonParsed"
-
-
- try:
- # url = 'https://solana.drpc.org'
- response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
- response.raise_for_status() # Raises an error for bad responses
- result = response.json()
- if not 'result' in result or 'error' in result:
- print("Error fetching data from Solana RPC:", result)
+ try:
+ response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
+ response.raise_for_status()
+ result = response.json()
+ if 'result' not in result or 'error' in result:
+ logger.error("Error fetching data from Solana RPC:", result)
+ return None
+ return result['result']
+ except Exception as e:
+ logger.error(f"Error fetching data from Solana RPC: {e}")
return None
- return result['result']
- except Exception as e:
- logging.error(f"Error fetching data from Solana RPC: {e}")
- return None
-
-class SolanaAPI:
-
- def __init__(self, process_log_callback, send_telegram_message_callback, list_initial_wallet_states_callback):
- self.process_log = process_log_callback
- self.list_initial_wallet_states = list_initial_wallet_states_callback
+
+class SolanaAPI:
+ def __init__(self, process_transaction_callback, on_initial_subscription_callback = None, on_bot_message=None):
+ self.process_transaction = process_transaction_callback
+ self.on_initial_subscription = on_initial_subscription_callback
+ self.on_bot_message = on_bot_message,
+
+ self.dex = SolanaDEX(DISPLAY_CURRENCY)
+ self.solana_ws = SolanaWS(on_message=self.process_transaction)
async def process_messages(self, solana_ws):
while True:
message = await solana_ws.message_queue.get()
- await self.process_log(message)
-
- async def wallet_watch_loop():
- solana_ws = SolanaWS(on_message=process_log)
+ await self.process_transaction(message)
+
+ async def wallet_watch_loop(self):
+
+ solana_ws = SolanaWS(on_message=self.process_transaction)
first_subscription = True
while True:
@@ -193,10 +168,10 @@ class SolanaAPI:
await solana_ws.subscribe()
if first_subscription:
- asyncio.create_task(self.list_initial_wallet_states())
+ asyncio.create_task(self.on_initial_subscription())
first_subscription = False
- await telegram_utils.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
+ await self.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
receive_task = asyncio.create_task(solana_ws.receive_messages())
process_task = asyncio.create_task(solana_ws.process_messages())
@@ -214,20 +189,15 @@ class SolanaAPI:
finally:
await solana_ws.unsubscribe()
if solana_ws.websocket:
- await solana_ws.websocket.close()
- await telegram_utils.send_telegram_message("Reconnecting...")
+ await solana_ws.close()
+ await self.send_telegram_message("Reconnecting...")
await asyncio.sleep(5)
-
- async def process_transaction(signature):
- # Implement your logic to process each transaction
+
+ async def process_transaction(self, signature):
print(f"Processing transaction: {signature['signature']}")
- # You can add more processing logic here, such as storing in a database,
- # triggering notifications, etc.
- # Example usage
- # async def main():
- # account_address = "Vote111111111111111111111111111111111111111"
-
- async def get_last_transactions(account_address, check_interval=300, limit=1000):
+ # Add your transaction processing logic here
+
+ async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
last_check_time = None
last_signature = None
@@ -252,17 +222,521 @@ class SolanaAPI:
if last_signature and signature['signature'] == last_signature:
break
- # Process the transaction
- await process_transaction(signature)
+ await self.process_transaction(signature)
if result:
last_signature = result[0]['signature']
last_check_time = current_time
- await asyncio.sleep(1) # Sleep for 1 second before checking again
+ await asyncio.sleep(1)
+
+ async def get_token_metadata_symbol(mint_address):
+ global TOKENS_INFO
+
+ if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]:
+ return TOKENS_INFO[mint_address].get('symbol')
+
+ try:
+ account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address)
+ if 'value' in account_data_result and 'data' in account_data_result['value']:
+ account_data_data = account_data_result['value']['data']
+ if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
+ account_data_info = account_data_data['parsed']['info']
+ if 'decimals' in account_data_info:
+ if mint_address in TOKENS_INFO:
+ TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
+ else:
+ TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
+ if 'tokenName' in account_data_info:
+ if mint_address in TOKENS_INFO:
+ TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
+ else:
+ TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
+
+ metadata = await get_token_metadata(mint_address)
+ if metadata:
+ if mint_address in TOKENS_INFO:
+ TOKENS_INFO[mint_address].update(metadata)
+ else:
+ TOKENS_INFO[mint_address] = metadata
+ await save_token_info()
+ # TOKENS_INFO[mint_address] = metadata
+ # return metadata.get('symbol') or metadata.get('name')
+ return TOKENS_INFO[mint_address].get('symbol')
+ except Exception as e:
+ logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
+ return None
+
+ async def get_transaction_details_rpc(tx_signature, readfromDump=False):
+ global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO
+ try:
+ if readfromDump and os.path.exists('./logs/transation_details.json'):
+ with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
+ transaction_details = json.load(f)
+ return transaction_details
+ else:
+ transaction_details = await solana_jsonrpc("getTransaction", tx_signature)
+ with open('./logs/transation_details.json', 'w') as f:
+ json.dump(transaction_details, f, indent=2)
+
+ if transaction_details is None:
+ logging.error(f"Error fetching transaction details for {tx_signature}")
+ return None
+
+ # Initialize default result structure
+ parsed_result = {
+ "order_id": None,
+ "token_in": None,
+ "token_out": None,
+ "amount_in": 0,
+ "amount_out": 0,
+ "amount_in_USD": 0,
+ "amount_out_USD": 0,
+ "percentage_swapped": 0
+ }
+
+ # Extract order_id from logs
+ log_messages = transaction_details.get("meta", {}).get("logMessages", [])
+ for log in log_messages:
+ if "order_id" in log:
+ parsed_result["order_id"] = log.split(":")[2].strip()
+ break
+
+ # Extract token transfers from innerInstructions
+ inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
+ for instruction_set in inner_instructions:
+ for instruction in instruction_set.get('instructions', []):
+ if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
+ info = instruction['parsed']['info']
+ mint = info['mint']
+ amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
+
+ # Determine which token is being swapped in and out based on zero balances
+ if parsed_result["token_in"] is None and amount > 0:
+ parsed_result["token_in"] = mint
+ parsed_result["amount_in"] = amount
+
+
+ if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
+ # if we've failed to extract token_in and token_out from the transaction details, try a second method
+ inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
+ transfers = []
+
+ for instruction_set in inner_instructions:
+ for instruction in instruction_set.get('instructions', []):
+ if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
+ info = instruction['parsed']['info']
+ amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
+ decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
+ adjusted_amount = amount / (10 ** decimals)
+ # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0))
+ transfers.append({
+ 'mint': info.get('mint'),
+ 'amount': adjusted_amount,
+ 'source': info['source'],
+ 'destination': info['destination']
+ })
+
+ # Identify token_in and token_out
+ if len(transfers) >= 2:
+ parsed_result["token_in"] = transfers[0]['mint']
+ parsed_result["amount_in"] = transfers[0]['amount']
+ parsed_result["token_out"] = transfers[-1]['mint']
+ parsed_result["amount_out"] = transfers[-1]['amount']
+
+ # If mint is not provided, query the Solana network for the account data
+ if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
+ #for transfer in transfers:
+ # do only first and last transfer
+ for transfer in [transfers[0], transfers[-1]]:
+ if transfer['mint'] is None:
+ # Query the Solana network for the account data
+ account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source'])
+
+ if 'value' in account_data_result and 'data' in account_data_result['value']:
+ account_data_value = account_data_result['value']
+ account_data_data = account_data_value['data']
+ if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
+ account_data_info = account_data_data['parsed']['info']
+ if 'mint' in account_data_info:
+ transfer['mint'] = account_data_info['mint']
+ if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
+ await get_token_metadata_symbol(transfer['mint'])
+ # get actual prices
+ current_price = await get_token_prices([transfer['mint']])
+
+ if parsed_result["token_in"] is None:
+ parsed_result["token_in"] = transfer['mint']
+ parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol']
+ parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
+ parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
+ elif parsed_result["token_out"] is None:
+ parsed_result["token_out"] = transfer['mint']
+ parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol']
+ parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
+ parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price']
+
+ pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
+ for balance in pre_balalnces:
+ if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
+ parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
+ break
+
+
+ # Calculate percentage swapped
+ try:
+ if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0:
+ parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
+ else:
+ # calculate based on total wallet value: FOLLOWED_WALLET_VALUE
+ parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100
+ except Exception as e:
+ logging.error(f"Error calculating percentage swapped: {e}")
+
+ return parsed_result
+
+ except requests.exceptions.RequestException as e:
+ print("Error fetching transaction details:", e)
+
-if __name__ == "__main__":
- asyncio.run(wallet_watch_loop())
\ No newline at end of file
+
+class SolanaDEX:
+ def __init__(self, DISPLAY_CURRENCY):
+ self.DISPLAY_CURRENCY = DISPLAY_CURRENCY
+ pass
+
+ async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
+ global TOKENS_INFO
+
+ # Skip for USD
+ prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
+ remaining_tokens = [addr for addr in token_addresses if addr not in prices]
+
+ # Try CoinGecko
+ coingecko_prices = await get_prices_from_coingecko(remaining_tokens)
+ prices.update(coingecko_prices)
+
+
+ # For remaining missing tokens, try Jupiter
+ missing_tokens = set(remaining_tokens) - set(prices.keys())
+ if missing_tokens:
+ jupiter_prices = await get_prices_from_jupiter(list(missing_tokens))
+ prices.update(jupiter_prices)
+
+
+ # For tokens not found in CoinGecko, use DexScreener
+ missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
+ if missing_tokens:
+ dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
+ prices.update(dexscreener_prices)
+
+ # For remaining missing tokens, try Raydium
+ missing_tokens = set(remaining_tokens) - set(prices.keys())
+ if missing_tokens:
+ raydium_prices = await get_prices_from_raydium(list(missing_tokens))
+ prices.update(raydium_prices)
+
+ # For remaining missing tokens, try Orca
+ missing_tokens = set(remaining_tokens) - set(prices.keys())
+ if missing_tokens:
+ orca_prices = await get_prices_from_orca(list(missing_tokens))
+ prices.update(orca_prices)
+
+ # If any tokens are still missing, set their prices to 0
+ for token in set(token_addresses) - set(prices.keys()):
+ prices[token] = 0.0
+ logging.warning(f"Price not found for token {token}. Setting to 0.")
+
+ for token, price in prices.items():
+ token_info = TOKENS_INFO.setdefault(token, {})
+ if 'symbol' not in token_info:
+ token_info['symbol'] = await get_token_metadata_symbol(token)
+ token_info['price'] = price
+
+ return prices
+
+ async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
+ base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
+ prices = {}
+
+ async def fetch_single_price(session, address):
+ params = {
+ "contract_addresses": address,
+ "vs_currencies": DISPLAY_CURRENCY.lower()
+ }
+ try:
+ async with session.get(base_url, params=params) as response:
+ if response.status == 200:
+ data = await response.json()
+ if address in data and DISPLAY_CURRENCY.lower() in data[address]:
+ return address, data[address][DISPLAY_CURRENCY.lower()]
+ else:
+ logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
+ except Exception as e:
+ logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}")
+ return address, None
+
+ async with aiohttp.ClientSession() as session:
+ tasks = [fetch_single_price(session, address) for address in token_addresses]
+ results = await asyncio.gather(*tasks)
+
+ for address, price in results:
+ if price is not None:
+ prices[address] = price
+
+ return prices
+
+ async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
+ base_url = "https://api.dexscreener.com/latest/dex/tokens/"
+ prices = {}
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
+ results = await asyncio.gather(*tasks)
+
+ for address, result in zip(token_addresses, results):
+ if result and 'pairs' in result and result['pairs']:
+ pair = result['pairs'][0] # Use the first pair (usually the most liquid)
+ prices[address] = float(pair['priceUsd'])
+ else:
+ logging.warning(f"No price data found on DexScreener for token {address}")
+ except Exception as e:
+ logging.error(f"Error fetching token prices from DexScreener: {str(e)}")
+
+ return prices
+
+ async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]:
+ url = "https://price.jup.ag/v4/price"
+ params = {
+ "ids": ",".join(token_addresses)
+ }
+ prices = {}
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, params=params) as response:
+ if response.status == 200:
+ data = await response.json()
+ for address, price_info in data.get('data', {}).items():
+ if 'price' in price_info:
+ prices[address] = float(price_info['price'])
+ else:
+ logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}")
+ except Exception as e:
+ logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
+ return prices
+
+ # New function for Raydium
+ async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]:
+ url = "https://api.raydium.io/v2/main/price"
+ prices = {}
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as response:
+ if response.status == 200:
+ data = await response.json()
+ for address in token_addresses:
+ if address in data:
+ prices[address] = float(data[address])
+ else:
+ logging.error(f"Failed to get token prices from Raydium. Status: {response.status}")
+ except Exception as e:
+ logging.error(f"Error fetching token prices from Raydium: {str(e)}")
+ return prices
+
+ # New function for Orca
+ async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]:
+ url = "https://api.orca.so/allTokens"
+ prices = {}
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as response:
+ if response.status == 200:
+ data = await response.json()
+ for token_info in data:
+ if token_info['mint'] in token_addresses:
+ prices[token_info['mint']] = float(token_info['price'])
+ else:
+ logging.error(f"Failed to get token prices from Orca. Status: {response.status}")
+ except Exception as e:
+ logging.error(f"Error fetching token prices from Orca: {str(e)}")
+ return prices
+
+ async def fetch_token_data(session, url):
+ try:
+ async with session.get(url) as response:
+ if response.status == 200:
+ return await response.json()
+ else:
+ logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
+ return None
+ except Exception as e:
+ logging.error(f"Error fetching data from {url}: {str(e)}")
+ return None
+
+ async def get_sol_price() -> float:
+ sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
+ return await get_token_prices([sol_address]).get(sol_address, 0.0)
+
+ async def get_wallet_balances(wallet_address, doGetTokenName=True):
+ balances = {}
+ logging.info(f"Getting balances for wallet: {wallet_address}")
+ global TOKENS_INFO
+ try:
+ response = await solana_client.get_token_accounts_by_owner_json_parsed(
+ Pubkey.from_string(wallet_address),
+ opts=TokenAccountOpts(
+ program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
+ ),
+ commitment=Confirmed
+ )
+
+ if response.value:
+ for account in response.value:
+ try:
+ parsed_data = account.account.data.parsed
+ if isinstance(parsed_data, dict) and 'info' in parsed_data:
+ info = parsed_data['info']
+ if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
+ mint = info['mint']
+ decimals = info['tokenAmount']['decimals']
+ amount = float(info['tokenAmount']['amount'])/10**decimals
+ if amount > 0:
+ if mint in TOKENS_INFO:
+ token_name = TOKENS_INFO[mint].get('symbol')
+ elif doGetTokenName:
+ token_name = await get_token_metadata_symbol(mint) or 'N/A'
+ # sleep for 1 second to avoid rate limiting
+ await asyncio.sleep(2)
+
+ TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals)
+ TOKENS_INFO[mint]['decimals'] = decimals
+ balances[mint] = {
+ 'name': token_name or 'N/A',
+ 'address': mint,
+ 'amount': amount,
+ 'decimals': decimals
+ }
+ # sleep for 1 second to avoid rate limiting
+ logging.debug(f"Account balance for {token_name} ({mint}): {amount}")
+ else:
+ logging.warning(f"Unexpected data format for account: {account}")
+ except Exception as e:
+ logging.error(f"Error parsing account data: {str(e)}")
+
+ sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
+ if sol_balance.value is not None:
+ balances['SOL'] = {
+ 'name': 'SOL',
+ 'address': 'SOL',
+ 'amount': sol_balance.value / 1e9
+ }
+ else:
+ logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
+
+ except Exception as e:
+ logging.error(f"Error getting wallet balances: {str(e)}")
+ logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}")
+ return balances
+
+ async def convert_balances_to_currency(balances , sol_price):
+ converted_balances = {}
+ for address, info in balances.items():
+ converted_balance = info.copy() # Create a copy of the original info
+ if info['name'] == 'SOL':
+ converted_balance['value'] = info['amount'] * sol_price
+ elif address in TOKEN_PRICES:
+ converted_balance['value'] = info['amount'] * TOKEN_PRICES[address]
+ else:
+ converted_balance['value'] = None # Price not available
+ logging.warning(f"Price not available for token {info['name']} ({address})")
+ converted_balances[address] = converted_balance
+ return converted_balances
+
+
+ async def list_initial_wallet_states():
+ global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
+ global TOKENS_INFO # new
+
+ followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
+ your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
+
+ all_token_addresses = list(set(followed_wallet_balances.keys()) |
+ set(your_wallet_balances.keys()) |
+ set(TOKEN_ADDRESSES.values()))
+
+ TOKEN_PRICES = await get_token_prices(all_token_addresses)
+ sol_price = await get_sol_price()
+
+ followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
+ your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
+
+
+ TOKEN_ADDRESSES = {
+ address: info for address,
+ info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
+ }
+ logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
+
+ followed_wallet_state = []
+ FOLLOWED_WALLET_VALUE = 0
+ for address, info in followed_converted_balances.items():
+ if info['value'] is not None and info['value'] > 0:
+ followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})")
+ FOLLOWED_WALLET_VALUE += info['value']
+
+ your_wallet_state = []
+ YOUR_WALLET_VALUE = 0
+ for address, info in your_converted_balances.items():
+ if info['value'] is not None and info['value'] > 0:
+ your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}")
+ YOUR_WALLET_VALUE += info['value']
+
+ message = (
+ f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n"
+ f"Followed Wallet ({FOLLOWED_WALLET}):\n"
+ f"{chr(10).join(followed_wallet_state)}\n"
+ f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
+ f"Your Wallet ({YOUR_WALLET}):\n"
+ f"{chr(10).join(your_wallet_state)}\n"
+ f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
+ f"Monitored Tokens:\n"
+ f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}"
+ )
+
+ logging.info(message)
+ await telegram_utils.send_telegram_message(message)
+
+ # save token info to file
+ await save_token_info()
+
+
+
+#example
+# async def main():
+# await telegram_utils.initialize()
+
+# async def process_log(log):
+# print(f"Processing log: {log}")
+
+# async def list_initial_wallet_states():
+# print("Listing initial wallet states")
+
+
+# wallet_watch_task = asyncio.create_task(solana_api.wallet_watch_loop())
+
+# try:
+# await asyncio.gather(wallet_watch_task)
+# except asyncio.CancelledError:
+# pass
+# finally:
+# await telegram_utils.close()
+
+# if __name__ == "__main__":
+# asyncio.run(main())
\ No newline at end of file
diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py
index b70b567..6ed7ab7 100644
--- a/crypto/sol/modules/utils.py
+++ b/crypto/sol/modules/utils.py
@@ -28,7 +28,7 @@ class TelegramUtils:
await self.initialize()
try:
- await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
+ # await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
logging.info(f"Telegram message sent: {message}")
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")