wip - still not working

This commit is contained in:
Dobromir Popov 2024-10-22 14:10:42 +03:00
parent 86b3a086b9
commit 1a31455d98
3 changed files with 133 additions and 101 deletions

View File

@ -432,9 +432,9 @@ async def process_messages(websocket):
pk = None pk = None
app = init_app()
# Convert Flask app to ASGI # Convert Flask app to ASGI
asgi_app = WsgiToAsgi(init_app) asgi_app = WsgiToAsgi(app)
async def main(): async def main():
global solanaAPI, bot, PROCESSING_LOG, pk global solanaAPI, bot, PROCESSING_LOG, pk

View File

@ -53,10 +53,10 @@ PING_INTERVAL = 30
SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute
from config import ( from config import (
FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET
) )
from modules.utils import telegram_utils from modules.utils import telegram_utils, async_safe_call
# Use the production Solana RPC endpoint # Use the production Solana RPC endpoint
solana_client = AsyncClient(SOLANA_HTTP_URL) solana_client = AsyncClient(SOLANA_HTTP_URL)
@ -124,7 +124,9 @@ class SolanaWS:
] ]
# define onmessage as inline callback to get subscription_id which waits for last_msg_responded # define onmessage as inline callback to get subscription_id which waits for last_msg_responded
# self.on_message = lambda message: self.subscription_id = message.get('result') # self.on_message = lambda message: self.subscription_id = message.get('result')
result = await self.ws_jsonrpc("logsSubscribe", params) await self.ws_jsonrpc("logsSubscribe", params, False)
await self.receive_messages(True)
result = await self.process_messages(True)
if result is not None and result > 0: if result is not None and result > 0:
self.subscription_id = result self.subscription_id = result
@ -143,11 +145,17 @@ class SolanaWS:
else: else:
logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}")
async def receive_messages(self): async def receive_messages(self, one = False):
while True: while True:
try: try:
message = await self.websocket.recv() response = await self.websocket.recv()
await self.message_queue.put(message) response_data = json.loads(response)
self.last_msg_responded = True
if 'result' in response_data:
await self.message_queue.put(response_data['result'])
if one:
break
except websockets.exceptions.ConnectionClosedError: except websockets.exceptions.ConnectionClosedError:
logger.error("WebSocket connection closed") logger.error("WebSocket connection closed")
break break
@ -155,12 +163,15 @@ class SolanaWS:
logger.error(f"Error receiving message: {e}") logger.error(f"Error receiving message: {e}")
break break
async def process_messages(self): async def process_messages(self, one = False):
while True: while True:
message = await self.message_queue.get() message = await self.message_queue.get()
message = json.loads(message)
if self.on_message: if self.on_message:
await self.on_message(message) await self.on_message(message)
logger.info(f"Received message: {message}") logger.info(f"Received message: {message}")
if one:
return message
async def close(self): async def close(self):
if self.websocket: if self.websocket:
@ -206,7 +217,7 @@ class SolanaAPI:
# Use the provided on_bot_message if it's callable, otherwise use the default # Use the provided on_bot_message if it's callable, otherwise use the default
self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message self.on_bot_message = on_bot_message if callable(on_bot_message) else default_on_bot_message
self.dex = SolanaDEX(DISPLAY_CURRENCY) self.dex = DEX
self.solana_ws = SolanaWS(on_message=self.process_transaction) self.solana_ws = SolanaWS(on_message=self.process_transaction)
async def process_messages(self, solana_ws): async def process_messages(self, solana_ws):
@ -226,11 +237,11 @@ class SolanaAPI:
await solana_ws.connect() await solana_ws.connect()
await solana_ws.subscribe() await solana_ws.subscribe()
if first_subscription and self.on_initial_subscription is not None: if first_subscription:
await self.on_initial_subscription await async_safe_call( self.on_initial_subscription, solana_ws.subscription_id)
first_subscription = False first_subscription = False
self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") await async_safe_call(self.on_bot_message,f"Solana mainnet connected ({solana_ws.subscription_id})...")
receive_task = asyncio.create_task(solana_ws.receive_messages()) receive_task = asyncio.create_task(solana_ws.receive_messages())
process_task = asyncio.create_task(solana_ws.process_messages()) process_task = asyncio.create_task(solana_ws.process_messages())
@ -249,8 +260,7 @@ class SolanaAPI:
await solana_ws.unsubscribe() await solana_ws.unsubscribe()
if solana_ws.websocket: if solana_ws.websocket:
await solana_ws.close() await solana_ws.close()
if self.on_bot_message: await async_safe_call(self.on_bot_message,"Reconnecting...")
await self.on_bot_message("Reconnecting...")
await asyncio.sleep(5) await asyncio.sleep(5)
async def get_last_transactions(self, account_address, check_interval=300, limit=1000): async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
@ -559,75 +569,69 @@ class SolanaAPI:
class SolanaDEX: class SolanaDEX:
def __init__(self, DISPLAY_CURRENCY): def __init__(self, DISPLAY_CURRENCY: str):
self.DISPLAY_CURRENCY = DISPLAY_CURRENCY self.DISPLAY_CURRENCY = DISPLAY_CURRENCY
pass self.solana_client = AsyncClient("https://api.mainnet-beta.solana.com")
self.TOKENS_INFO = {}
self.TOKEN_PRICES = {}
self.TOKEN_ADDRESSES = {}
self.FOLLOWED_WALLET_VALUE = 0
self.YOUR_WALLET_VALUE = 0
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: async def get_token_prices(self, token_addresses: List[str]) -> Dict[str, float]:
global TOKENS_INFO
# Skip for USD
prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
remaining_tokens = [addr for addr in token_addresses if addr not in prices] remaining_tokens = [addr for addr in token_addresses if addr not in prices]
# Try CoinGecko
coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens) coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens)
prices.update(coingecko_prices) prices.update(coingecko_prices)
# For remaining missing tokens, try Jupiter
missing_tokens = set(remaining_tokens) - set(prices.keys()) missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens: if missing_tokens:
jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) jupiter_prices = await self.get_prices_from_jupiter(list(missing_tokens))
prices.update(jupiter_prices) prices.update(jupiter_prices)
# For tokens not found in CoinGecko, use DexScreener
missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
if missing_tokens: if missing_tokens:
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) dexscreener_prices = await self.get_prices_from_dexscreener(list(missing_tokens))
prices.update(dexscreener_prices) prices.update(dexscreener_prices)
# For remaining missing tokens, try Raydium
missing_tokens = set(remaining_tokens) - set(prices.keys()) missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens: if missing_tokens:
raydium_prices = await get_prices_from_raydium(list(missing_tokens)) raydium_prices = await self.get_prices_from_raydium(list(missing_tokens))
prices.update(raydium_prices) prices.update(raydium_prices)
# For remaining missing tokens, try Orca
missing_tokens = set(remaining_tokens) - set(prices.keys()) missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens: if missing_tokens:
orca_prices = await get_prices_from_orca(list(missing_tokens)) orca_prices = await self.get_prices_from_orca(list(missing_tokens))
prices.update(orca_prices) prices.update(orca_prices)
# If any tokens are still missing, set their prices to 0
for token in set(token_addresses) - set(prices.keys()): for token in set(token_addresses) - set(prices.keys()):
prices[token] = 0.0 prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.") logging.warning(f"Price not found for token {token}. Setting to 0.")
for token, price in prices.items(): for token, price in prices.items():
token_info = TOKENS_INFO.setdefault(token, {}) token_info = self.TOKENS_INFO.setdefault(token, {})
if 'symbol' not in token_info: if 'symbol' not in token_info:
token_info['symbol'] = await get_token_metadata_symbol(token) token_info['symbol'] = await self.get_token_metadata_symbol(token)
token_info['price'] = price token_info['price'] = price
return prices return prices
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: async def get_prices_from_coingecko(self, token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
prices = {} prices = {}
async def fetch_single_price(session, address): async def fetch_single_price(session, address):
params = { params = {
"contract_addresses": address, "contract_addresses": address,
"vs_currencies": DISPLAY_CURRENCY.lower() "vs_currencies": self.DISPLAY_CURRENCY.lower()
} }
try: try:
async with session.get(base_url, params=params) as response: async with session.get(base_url, params=params) as response:
if response.status == 200: if response.status == 200:
data = await response.json() data = await response.json()
if address in data and DISPLAY_CURRENCY.lower() in data[address]: if address in data and self.DISPLAY_CURRENCY.lower() in data[address]:
return address, data[address][DISPLAY_CURRENCY.lower()] return address, data[address][self.DISPLAY_CURRENCY.lower()]
else: else:
logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
except Exception as e: except Exception as e:
@ -644,13 +648,13 @@ class SolanaDEX:
return prices return prices
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: async def get_prices_from_dexscreener(self, token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/" base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {} prices = {}
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] tasks = [self.fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results): for address, result in zip(token_addresses, results):
@ -664,7 +668,7 @@ class SolanaDEX:
return prices return prices
async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: async def get_prices_from_jupiter(self, token_addresses: List[str]) -> Dict[str, float]:
url = "https://price.jup.ag/v4/price" url = "https://price.jup.ag/v4/price"
params = { params = {
"ids": ",".join(token_addresses) "ids": ",".join(token_addresses)
@ -685,8 +689,7 @@ class SolanaDEX:
logging.error(f"Error fetching token prices from Jupiter: {str(e)}") logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
return prices return prices
# New function for Raydium async def get_prices_from_raydium(self, token_addresses: List[str]) -> Dict[str, float]:
async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.raydium.io/v2/main/price" url = "https://api.raydium.io/v2/main/price"
prices = {} prices = {}
@ -704,8 +707,7 @@ class SolanaDEX:
logging.error(f"Error fetching token prices from Raydium: {str(e)}") logging.error(f"Error fetching token prices from Raydium: {str(e)}")
return prices return prices
# New function for Orca async def get_prices_from_orca(self, token_addresses: List[str]) -> Dict[str, float]:
async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.orca.so/allTokens" url = "https://api.orca.so/allTokens"
prices = {} prices = {}
@ -723,6 +725,7 @@ class SolanaDEX:
logging.error(f"Error fetching token prices from Orca: {str(e)}") logging.error(f"Error fetching token prices from Orca: {str(e)}")
return prices return prices
@staticmethod
async def fetch_token_data(session, url): async def fetch_token_data(session, url):
try: try:
async with session.get(url) as response: async with session.get(url) as response:
@ -735,17 +738,16 @@ class SolanaDEX:
logging.error(f"Error fetching data from {url}: {str(e)}") logging.error(f"Error fetching data from {url}: {str(e)}")
return None return None
async def get_sol_price() -> float: async def get_sol_price(self) -> float:
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
return await get_token_prices([sol_address]).get(sol_address, 0.0) prices = await self.get_token_prices([sol_address])
return prices.get(sol_address, 0.0)
async def get_wallet_balances(self, wallet_address, doGetTokenName=True):
async def get_wallet_balances(wallet_address, doGetTokenName=True):
balances = {} balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}") logging.info(f"Getting balances for wallet: {wallet_address}")
global TOKENS_INFO
try: try:
response = await solana_client.get_token_accounts_by_owner_json_parsed( response = await self.solana_client.get_token_accounts_by_owner_json_parsed(
Pubkey.from_string(wallet_address), Pubkey.from_string(wallet_address),
opts=TokenAccountOpts( opts=TokenAccountOpts(
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
@ -764,29 +766,27 @@ class SolanaDEX:
decimals = info['tokenAmount']['decimals'] decimals = info['tokenAmount']['decimals']
amount = float(info['tokenAmount']['amount'])/10**decimals amount = float(info['tokenAmount']['amount'])/10**decimals
if amount > 0: if amount > 0:
if mint in TOKENS_INFO: if mint in self.TOKENS_INFO:
token_name = TOKENS_INFO[mint].get('symbol') token_name = self.TOKENS_INFO[mint].get('symbol')
elif doGetTokenName: elif doGetTokenName:
token_name = await get_token_metadata_symbol(mint) or 'N/A' token_name = await self.get_token_metadata_symbol(mint) or 'N/A'
# sleep for 1 second to avoid rate limiting
await asyncio.sleep(2) await asyncio.sleep(2)
TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) self.TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals)
TOKENS_INFO[mint]['decimals'] = decimals self.TOKENS_INFO[mint]['decimals'] = decimals
balances[mint] = { balances[mint] = {
'name': token_name or 'N/A', 'name': token_name or 'N/A',
'address': mint, 'address': mint,
'amount': amount, 'amount': amount,
'decimals': decimals 'decimals': decimals
} }
# sleep for 1 second to avoid rate limiting
logging.debug(f"Account balance for {token_name} ({mint}): {amount}") logging.debug(f"Account balance for {token_name} ({mint}): {amount}")
else: else:
logging.warning(f"Unexpected data format for account: {account}") logging.warning(f"Unexpected data format for account: {account}")
except Exception as e: except Exception as e:
logging.error(f"Error parsing account data: {str(e)}") logging.error(f"Error parsing account data: {str(e)}")
sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) sol_balance = await self.solana_client.get_balance(Pubkey.from_string(wallet_address))
if sol_balance.value is not None: if sol_balance.value is not None:
balances['SOL'] = { balances['SOL'] = {
'name': 'SOL', 'name': 'SOL',
@ -801,80 +801,87 @@ class SolanaDEX:
logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}")
return balances return balances
async def convert_balances_to_currency(balances , sol_price): async def convert_balances_to_currency(self, balances, sol_price):
converted_balances = {} converted_balances = {}
for address, info in balances.items(): for address, info in balances.items():
converted_balance = info.copy() # Create a copy of the original info converted_balance = info.copy()
if info['name'] == 'SOL': if info['name'] == 'SOL':
converted_balance['value'] = info['amount'] * sol_price converted_balance['value'] = info['amount'] * sol_price
elif address in TOKEN_PRICES: elif address in self.TOKEN_PRICES:
converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] converted_balance['value'] = info['amount'] * self.TOKEN_PRICES[address]
else: else:
converted_balance['value'] = None # Price not available converted_balance['value'] = None
logging.warning(f"Price not available for token {info['name']} ({address})") logging.warning(f"Price not available for token {info['name']} ({address})")
converted_balances[address] = converted_balance converted_balances[address] = converted_balance
return converted_balances return converted_balances
async def list_initial_wallet_states(self, FOLLOWED_WALLET, YOUR_WALLET):
async def list_initial_wallet_states(): followed_wallet_balances = await self.get_wallet_balances(FOLLOWED_WALLET)
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES your_wallet_balances = await self.get_wallet_balances(YOUR_WALLET)
global TOKENS_INFO # new
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_wallet_balances.keys()) | all_token_addresses = list(set(followed_wallet_balances.keys()) |
set(your_wallet_balances.keys()) | set(your_wallet_balances.keys()) |
set(TOKEN_ADDRESSES.values())) set(self.TOKEN_ADDRESSES.values()))
TOKEN_PRICES = await get_token_prices(all_token_addresses) self.TOKEN_PRICES = await self.get_token_prices(all_token_addresses)
sol_price = await get_sol_price() sol_price = await self.get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) followed_converted_balances = await self.convert_balances_to_currency(followed_wallet_balances, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) your_converted_balances = await self.convert_balances_to_currency(your_wallet_balances, sol_price)
self.TOKEN_ADDRESSES = {
TOKEN_ADDRESSES = {
address: info for address, address: info for address,
info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
} }
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") logging.info(f"Monitoring balances for tokens: {[info['name'] for info in self.TOKEN_ADDRESSES.values()]}")
followed_wallet_state = [] followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0 self.FOLLOWED_WALLET_VALUE = 0
for address, info in followed_converted_balances.items(): for address, info in followed_converted_balances.items():
if info['value'] is not None and info['value'] > 0: if info['value'] is not None and info['value'] > 0:
followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY} ({info['address']})")
FOLLOWED_WALLET_VALUE += info['value'] self.FOLLOWED_WALLET_VALUE += info['value']
your_wallet_state = [] your_wallet_state = []
YOUR_WALLET_VALUE = 0 self.YOUR_WALLET_VALUE = 0
for address, info in your_converted_balances.items(): for address, info in your_converted_balances.items():
if info['value'] is not None and info['value'] > 0: if info['value'] is not None and info['value'] > 0:
your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {self.DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += info['value'] self.YOUR_WALLET_VALUE += info['value']
message = ( message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n" f"<b>Initial Wallet States (All balances in {self.DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n" f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n" f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"<b>Total Value:</b> {self.FOLLOWED_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n" f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n" f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" f"<b>Total Value:</b> {self.YOUR_WALLET_VALUE:.2f} {self.DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n" f"<b>Monitored Tokens:</b>\n"
f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" f"{', '.join([self.safe_get_property(info, 'name') for info in self.TOKEN_ADDRESSES.values()])}"
) )
logging.info(message) logging.info(message)
await telegram_utils.send_telegram_message(message) # await telegram_utils.send_telegram_message(message)
# save token info to file # save token info to file
await save_token_info() await self.save_token_info()
@staticmethod
def safe_get_property(obj, prop):
return obj.get(prop, 'N/A')
async def get_token_metadata_symbol(self, token_address):
# Implement this method to fetch token metadata symbol
pass
async def save_token_info(self):
# Implement this method to save token info to a file
pass
async def save_token_info(): async def save_token_info():
with open('./logs/token_info.json', 'w') as f: with open('./logs/token_info.json', 'w') as f:
json.dump(TOKENS_INFO, f, indent=2) json.dump(TOKENS_INFO, f, indent=2)
DEX = SolanaDEX(DISPLAY_CURRENCY)
SAPI = SolanaAPI( on_initial_subscription_callback=SolanaDEX.list_initial_wallet_states()) SAPI = SolanaAPI( on_initial_subscription_callback=DEX.list_initial_wallet_states(FOLLOWED_WALLET,YOUR_WALLET))

View File

@ -10,6 +10,8 @@ from telegram.constants import ParseMode
from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME
import asyncio
from typing import Callable, Any
import time import time
import logging import logging
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
@ -67,7 +69,7 @@ class Log:
# Set up success logger for accounting CSV # Set up success logger for accounting CSV
def __init__(self): def __init__(self):
logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
#logging.basicConfig(level=logging.INFO) #logging.basicConfig(level=logging.INFO)
@ -109,11 +111,34 @@ def safe_get_property(info, property_name, default='Unknown'):
value = info.get(property_name, default) value = info.get(property_name, default)
return str(value) if value is not None else str(default) return str(value) if value is not None else str(default)
async def async_safe_call(func: Callable, *args: Any, **kwargs: Any) -> Any:
"""
Safely call a function that might be synchronous, asynchronous, or a coroutine object.
:param func: The function to call, or a coroutine object
:param args: Positional arguments to pass to the function
:param kwargs: Keyword arguments to pass to the function
:return: The result of the function call, or None if func is not callable or a coroutine
"""
if func is None:
return None
if callable(func):
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
elif asyncio.iscoroutine(func):
# If func is already a coroutine object, just await it
return await func
else:
logging.warning(f"Expected a callable or coroutine, but got {type(func)}: {func}")
return None
# Create a global instance of TelegramUtils # Create a global instance of TelegramUtils
telegram_utils = TelegramUtils() telegram_utils = TelegramUtils()
log = Log() log = Log().logger
# You can add more Telegram-related methods to the TelegramUtils class if needed # You can add more Telegram-related methods to the TelegramUtils class if needed