diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 587da77..4ffa7b8 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -432,9 +432,9 @@ async def process_messages(websocket): pk = None - +app = init_app() # Convert Flask app to ASGI -asgi_app = WsgiToAsgi(init_app) +asgi_app = WsgiToAsgi(app) async def main(): global solanaAPI, bot, PROCESSING_LOG, pk diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 7b22ca4..3ee08a7 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -53,10 +53,10 @@ PING_INTERVAL = 30 SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute from config import ( - FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS + FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET ) -from modules.utils import telegram_utils +from modules.utils import telegram_utils, async_safe_call # Use the production Solana RPC endpoint solana_client = AsyncClient(SOLANA_HTTP_URL) @@ -124,8 +124,10 @@ class SolanaWS: ] # define onmessage as inline callback to get subscription_id which waits for last_msg_responded # self.on_message = lambda message: self.subscription_id = message.get('result') - result = await self.ws_jsonrpc("logsSubscribe", params) - + await self.ws_jsonrpc("logsSubscribe", params, False) + await self.receive_messages(True) + result = await self.process_messages(True) + if result is not None and result > 0: self.subscription_id = result logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") @@ -143,11 +145,17 @@ class SolanaWS: else: logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") - async def receive_messages(self): + async def receive_messages(self, one = False): while True: try: - message = await self.websocket.recv() - await self.message_queue.put(message) + response = await self.websocket.recv() + response_data = json.loads(response) + self.last_msg_responded = True + + if 'result' in response_data: + await self.message_queue.put(response_data['result']) + if one: + break except websockets.exceptions.ConnectionClosedError: logger.error("WebSocket connection closed") break @@ -155,12 +163,15 @@ class SolanaWS: logger.error(f"Error receiving message: {e}") break - async def process_messages(self): + async def process_messages(self, one = False): while True: message = await self.message_queue.get() + message = json.loads(message) if self.on_message: await self.on_message(message) logger.info(f"Received message: {message}") + if one: + return message async def close(self): if self.websocket: @@ -206,7 +217,7 @@ class SolanaAPI: # Use the provided on_bot_message if it's callable, otherwise use the default self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message - self.dex = SolanaDEX(DISPLAY_CURRENCY) + self.dex = DEX self.solana_ws = SolanaWS(on_message=self.process_transaction) async def process_messages(self, solana_ws): @@ -226,13 +237,13 @@ class SolanaAPI: await solana_ws.connect() await solana_ws.subscribe() - if first_subscription and self.on_initial_subscription is not None: - await self.on_initial_subscription + if first_subscription: + await async_safe_call( self.on_initial_subscription, solana_ws.subscription_id) first_subscription = False - self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + await async_safe_call(self.on_bot_message,f"Solana mainnet connected ({solana_ws.subscription_id})...") - receive_task = asyncio.create_task(solana_ws.receive_messages()) + receive_task = asyncio.create_task(solana_ws.receive_messages()) process_task = asyncio.create_task(solana_ws.process_messages()) try: @@ -249,8 +260,7 @@ class SolanaAPI: await solana_ws.unsubscribe() if solana_ws.websocket: await solana_ws.close() - if self.on_bot_message: - await self.on_bot_message("Reconnecting...") + await async_safe_call(self.on_bot_message,"Reconnecting...") await asyncio.sleep(5) async def get_last_transactions(self, account_address, check_interval=300, limit=1000): @@ -559,75 +569,69 @@ class SolanaAPI: class SolanaDEX: - def __init__(self, DISPLAY_CURRENCY): + def __init__(self, DISPLAY_CURRENCY: str): self.DISPLAY_CURRENCY = DISPLAY_CURRENCY - pass - - async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: - global TOKENS_INFO - - # Skip for USD + self.solana_client = AsyncClient("https://api.mainnet-beta.solana.com") + self.TOKENS_INFO = {} + self.TOKEN_PRICES = {} + self.TOKEN_ADDRESSES = {} + self.FOLLOWED_WALLET_VALUE = 0 + self.YOUR_WALLET_VALUE = 0 + + async def get_token_prices(self, token_addresses: List[str]) -> Dict[str, float]: prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} remaining_tokens = [addr for addr in token_addresses if addr not in prices] - # Try CoinGecko coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens) prices.update(coingecko_prices) - - # For remaining missing tokens, try Jupiter missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: - jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) + jupiter_prices = await self.get_prices_from_jupiter(list(missing_tokens)) prices.update(jupiter_prices) - - # For tokens not found in CoinGecko, use DexScreener missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) if missing_tokens: - dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) + dexscreener_prices = await self.get_prices_from_dexscreener(list(missing_tokens)) prices.update(dexscreener_prices) - # For remaining missing tokens, try Raydium missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: - raydium_prices = await get_prices_from_raydium(list(missing_tokens)) + raydium_prices = await self.get_prices_from_raydium(list(missing_tokens)) prices.update(raydium_prices) - # For remaining missing tokens, try Orca missing_tokens = set(remaining_tokens) - set(prices.keys()) if missing_tokens: - orca_prices = await get_prices_from_orca(list(missing_tokens)) + orca_prices = await self.get_prices_from_orca(list(missing_tokens)) prices.update(orca_prices) - # If any tokens are still missing, set their prices to 0 for token in set(token_addresses) - set(prices.keys()): prices[token] = 0.0 logging.warning(f"Price not found for token {token}. Setting to 0.") for token, price in prices.items(): - token_info = TOKENS_INFO.setdefault(token, {}) + token_info = self.TOKENS_INFO.setdefault(token, {}) if 'symbol' not in token_info: - token_info['symbol'] = await get_token_metadata_symbol(token) + token_info['symbol'] = await self.get_token_metadata_symbol(token) token_info['price'] = price return prices - async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: + async def get_prices_from_coingecko(self, token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" prices = {} async def fetch_single_price(session, address): params = { "contract_addresses": address, - "vs_currencies": DISPLAY_CURRENCY.lower() + "vs_currencies": self.DISPLAY_CURRENCY.lower() } try: async with session.get(base_url, params=params) as response: if response.status == 200: data = await response.json() - if address in data and DISPLAY_CURRENCY.lower() in data[address]: - return address, data[address][DISPLAY_CURRENCY.lower()] + if address in data and self.DISPLAY_CURRENCY.lower() in data[address]: + return address, data[address][self.DISPLAY_CURRENCY.lower()] else: logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") except Exception as e: @@ -644,13 +648,13 @@ class SolanaDEX: return prices - async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: + async def get_prices_from_dexscreener(self, token_addresses: List[str]) -> Dict[str, float]: base_url = "https://api.dexscreener.com/latest/dex/tokens/" prices = {} try: async with aiohttp.ClientSession() as session: - tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] + tasks = [self.fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] results = await asyncio.gather(*tasks) for address, result in zip(token_addresses, results): @@ -664,7 +668,7 @@ class SolanaDEX: return prices - async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: + async def get_prices_from_jupiter(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://price.jup.ag/v4/price" params = { "ids": ",".join(token_addresses) @@ -685,8 +689,7 @@ class SolanaDEX: logging.error(f"Error fetching token prices from Jupiter: {str(e)}") return prices - # New function for Raydium - async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: + async def get_prices_from_raydium(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://api.raydium.io/v2/main/price" prices = {} @@ -704,8 +707,7 @@ class SolanaDEX: logging.error(f"Error fetching token prices from Raydium: {str(e)}") return prices - # New function for Orca - async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: + async def get_prices_from_orca(self, token_addresses: List[str]) -> Dict[str, float]: url = "https://api.orca.so/allTokens" prices = {} @@ -723,6 +725,7 @@ class SolanaDEX: logging.error(f"Error fetching token prices from Orca: {str(e)}") return prices + @staticmethod async def fetch_token_data(session, url): try: async with session.get(url) as response: @@ -735,17 +738,16 @@ class SolanaDEX: logging.error(f"Error fetching data from {url}: {str(e)}") return None - async def get_sol_price() -> float: + async def get_sol_price(self) -> float: sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address - return await get_token_prices([sol_address]).get(sol_address, 0.0) + prices = await self.get_token_prices([sol_address]) + return prices.get(sol_address, 0.0) - - async def get_wallet_balances(wallet_address, doGetTokenName=True): + async def get_wallet_balances(self, wallet_address, doGetTokenName=True): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") - global TOKENS_INFO try: - response = await solana_client.get_token_accounts_by_owner_json_parsed( + response = await self.solana_client.get_token_accounts_by_owner_json_parsed( Pubkey.from_string(wallet_address), opts=TokenAccountOpts( program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") @@ -764,29 +766,27 @@ class SolanaDEX: decimals = info['tokenAmount']['decimals'] amount = float(info['tokenAmount']['amount'])/10**decimals if amount > 0: - if mint in TOKENS_INFO: - token_name = TOKENS_INFO[mint].get('symbol') + if mint in self.TOKENS_INFO: + token_name = self.TOKENS_INFO[mint].get('symbol') elif doGetTokenName: - token_name = await get_token_metadata_symbol(mint) or 'N/A' - # sleep for 1 second to avoid rate limiting + token_name = await self.get_token_metadata_symbol(mint) or 'N/A' await asyncio.sleep(2) - TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) - TOKENS_INFO[mint]['decimals'] = decimals + self.TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) + self.TOKENS_INFO[mint]['decimals'] = decimals balances[mint] = { 'name': token_name or 'N/A', 'address': mint, 'amount': amount, 'decimals': decimals } - # sleep for 1 second to avoid rate limiting logging.debug(f"Account balance for {token_name} ({mint}): {amount}") else: logging.warning(f"Unexpected data format for account: {account}") except Exception as e: logging.error(f"Error parsing account data: {str(e)}") - sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) + sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address)) if sol_balance.value is not None: balances['SOL'] = { 'name': 'SOL', @@ -801,80 +801,87 @@ class SolanaDEX: logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") return balances - async def convert_balances_to_currency(balances , sol_price): + async def convert_balances_to_currency(self, balances, sol_price): converted_balances = {} for address, info in balances.items(): - converted_balance = info.copy() # Create a copy of the original info + converted_balance = info.copy() if info['name'] == 'SOL': converted_balance['value'] = info['amount'] * sol_price - elif address in TOKEN_PRICES: - converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] + elif address in self.TOKEN_PRICES: + converted_balance['value'] = info['amount'] * self.TOKEN_PRICES[address] else: - converted_balance['value'] = None # Price not available + converted_balance['value'] = None logging.warning(f"Price not available for token {info['name']} ({address})") converted_balances[address] = converted_balance return converted_balances - - async def list_initial_wallet_states(): - global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES - global TOKENS_INFO # new - - followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) - your_wallet_balances = await get_wallet_balances(YOUR_WALLET) + async def list_initial_wallet_states(self, FOLLOWED_WALLET, YOUR_WALLET): + followed_wallet_balances = await self.get_wallet_balances(FOLLOWED_WALLET) + your_wallet_balances = await self.get_wallet_balances(YOUR_WALLET) all_token_addresses = list(set(followed_wallet_balances.keys()) | set(your_wallet_balances.keys()) | - set(TOKEN_ADDRESSES.values())) + set(self.TOKEN_ADDRESSES.values())) - TOKEN_PRICES = await get_token_prices(all_token_addresses) - sol_price = await get_sol_price() + self.TOKEN_PRICES = await self.get_token_prices(all_token_addresses) + sol_price = await self.get_sol_price() - followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) - your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) + followed_converted_balances = await self.convert_balances_to_currency(followed_wallet_balances, sol_price) + your_converted_balances = await self.convert_balances_to_currency(your_wallet_balances, sol_price) - - TOKEN_ADDRESSES = { + self.TOKEN_ADDRESSES = { address: info for address, info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 } - logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") + logging.info(f"Monitoring balances for tokens: {[info['name'] for info in self.TOKEN_ADDRESSES.values()]}") followed_wallet_state = [] - FOLLOWED_WALLET_VALUE = 0 + self.FOLLOWED_WALLET_VALUE = 0 for address, info in followed_converted_balances.items(): if info['value'] is not None and info['value'] > 0: - followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") - FOLLOWED_WALLET_VALUE += info['value'] + followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY} ({info['address']})") + self.FOLLOWED_WALLET_VALUE += info['value'] your_wallet_state = [] - YOUR_WALLET_VALUE = 0 + self.YOUR_WALLET_VALUE = 0 for address, info in your_converted_balances.items(): if info['value'] is not None and info['value'] > 0: - your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") - YOUR_WALLET_VALUE += info['value'] + your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY}") + self.YOUR_WALLET_VALUE += info['value'] message = ( - f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" + f"Initial Wallet States (All balances in {self.DISPLAY_CURRENCY}):\n\n" f"Followed Wallet ({FOLLOWED_WALLET}):\n" f"{chr(10).join(followed_wallet_state)}\n" - f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Total Value: {self.FOLLOWED_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n" f"Your Wallet ({YOUR_WALLET}):\n" f"{chr(10).join(your_wallet_state)}\n" - f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Total Value: {self.YOUR_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n" f"Monitored Tokens:\n" - f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" - ) + f"{', '.join([self.safe_get_property(info, 'name') for info in self.TOKEN_ADDRESSES.values()])}" + ) logging.info(message) - await telegram_utils.send_telegram_message(message) + # await telegram_utils.send_telegram_message(message) # save token info to file - await save_token_info() - + await self.save_token_info() + + @staticmethod + def safe_get_property(obj, prop): + return obj.get(prop, 'N/A') + + async def get_token_metadata_symbol(self, token_address): + # Implement this method to fetch token metadata symbol + pass + + async def save_token_info(self): + # Implement this method to save token info to a file + pass + async def save_token_info(): with open('./logs/token_info.json', 'w') as f: json.dump(TOKENS_INFO, f, indent=2) - -SAPI = SolanaAPI( on_initial_subscription_callback=SolanaDEX.list_initial_wallet_states()) \ No newline at end of file +DEX = SolanaDEX(DISPLAY_CURRENCY) +SAPI = SolanaAPI( on_initial_subscription_callback=DEX.list_initial_wallet_states(FOLLOWED_WALLET,YOUR_WALLET)) \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index cc8ff4d..35b142b 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -10,6 +10,8 @@ from telegram.constants import ParseMode from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME +import asyncio +from typing import Callable, Any import time import logging from logging.handlers import RotatingFileHandler @@ -67,7 +69,7 @@ class Log: # Set up success logger for accounting CSV def __init__(self): - logger = logging.getLogger(__name__) + self.logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) #logging.basicConfig(level=logging.INFO) @@ -109,11 +111,34 @@ def safe_get_property(info, property_name, default='Unknown'): value = info.get(property_name, default) return str(value) if value is not None else str(default) +async def async_safe_call(func: Callable, *args: Any, **kwargs: Any) -> Any: + """ + Safely call a function that might be synchronous, asynchronous, or a coroutine object. + + :param func: The function to call, or a coroutine object + :param args: Positional arguments to pass to the function + :param kwargs: Keyword arguments to pass to the function + :return: The result of the function call, or None if func is not callable or a coroutine + """ + if func is None: + return None + + if callable(func): + if asyncio.iscoroutinefunction(func): + return await func(*args, **kwargs) + else: + return func(*args, **kwargs) + elif asyncio.iscoroutine(func): + # If func is already a coroutine object, just await it + return await func + else: + logging.warning(f"Expected a callable or coroutine, but got {type(func)}: {func}") + return None # Create a global instance of TelegramUtils telegram_utils = TelegramUtils() -log = Log() +log = Log().logger # You can add more Telegram-related methods to the TelegramUtils class if needed