This commit is contained in:
Dobromir Popov 2024-10-12 19:36:36 +03:00
commit be141f1dc0

View File

@ -119,7 +119,12 @@ dexscreener_client = DexscreenerClient()
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# Create a custom connection pool
conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit
timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout
# Create the bot with the custom connection pool
bot = Bot(TELEGRAM_BOT_TOKEN, request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request)
# Token addresses (initialize with some known tokens)
TOKEN_ADDRESSES = {
@ -151,18 +156,45 @@ async def send_telegram_message(message):
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
global TOKENS_INFO
prices = await get_prices_from_coingecko(token_addresses)
# Skip for USD
prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
remaining_tokens = [addr for addr in token_addresses if addr not in prices]
# Try CoinGecko
coingecko_prices = await get_prices_from_coingecko(remaining_tokens)
prices.update(coingecko_prices)
# For remaining missing tokens, try Jupiter
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
jupiter_prices = await get_prices_from_jupiter(list(missing_tokens))
prices.update(jupiter_prices)
# For tokens not found in CoinGecko, use DexScreener
missing_tokens = set(token_addresses) - set(prices.keys())
missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
if missing_tokens:
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
prices.update(dexscreener_prices)
# For remaining missing tokens, try Raydium
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
raydium_prices = await get_prices_from_raydium(list(missing_tokens))
prices.update(raydium_prices)
# For remaining missing tokens, try Orca
missing_tokens = set(remaining_tokens) - set(prices.keys())
if missing_tokens:
orca_prices = await get_prices_from_orca(list(missing_tokens))
prices.update(orca_prices)
# If any tokens are still missing, set their prices to 0
for token in set(token_addresses) - set(prices.keys()):
prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.")
for token, price in prices.items():
token_info = TOKENS_INFO.setdefault(token, {})
if 'symbol' not in token_info:
@ -171,43 +203,118 @@ async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
return prices
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
params = {
"contract_addresses": ",".join(token_addresses),
"vs_currencies": DISPLAY_CURRENCY.lower()
}
base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
prices = {}
async def fetch_single_price(session, address):
params = {
"contract_addresses": address,
"vs_currencies": DISPLAY_CURRENCY.lower()
}
try:
async with session.get(base_url, params=params) as response:
if response.status == 200:
data = await response.json()
if address in data and DISPLAY_CURRENCY.lower() in data[address]:
return address, data[address][DISPLAY_CURRENCY.lower()]
else:
logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}")
return address, None
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.items():
if DISPLAY_CURRENCY.lower() in price_info:
prices[address] = price_info[DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}")
tasks = [fetch_single_price(session, address) for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, price in results:
if price is not None:
prices[address] = price
return prices
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {}
async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
try:
async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
except Exception as e:
logging.error(f"Error fetching token prices from DexScreener: {str(e)}")
return prices
async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]:
url = "https://price.jup.ag/v4/price"
params = {
"ids": ",".join(token_addresses)
}
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.get('data', {}).items():
if 'price' in price_info:
prices[address] = float(price_info['price'])
else:
logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
return prices
# New function for Raydium
async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.raydium.io/v2/main/price"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for address in token_addresses:
if address in data:
prices[address] = float(data[address])
else:
logging.error(f"Failed to get token prices from Raydium. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Raydium: {str(e)}")
return prices
# New function for Orca
async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.orca.so/allTokens"
prices = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for token_info in data:
if token_info['mint'] in token_addresses:
prices[token_info['mint']] = float(token_info['price'])
else:
logging.error(f"Failed to get token prices from Orca. Status: {response.status}")
except Exception as e:
logging.error(f"Error fetching token prices from Orca: {str(e)}")
return prices
async def fetch_token_data(session, url):
try:
async with session.get(url) as response:
@ -836,7 +943,17 @@ async def save_log(log):
PROCESSING_LOG = False
async def process_log(log_result):
global PROCESSING_LOG
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
if log_result['value']['err']:
return
@ -854,16 +971,6 @@ async def process_log(log_result):
before_source_balance = 0
source_token_change = 0
tr_details = {
"order_id": None,
"token_in": None,
"token_out": None,
"amount_in": 0,
"amount_out": 0,
"amount_in_USD": 0,
"amount_out_USD": 0,
"percentage_swapped": 0
}
i = 0
while i < len(logs):
log_entry = logs[i]
@ -1023,7 +1130,11 @@ async def follow_move(move):
# Calculate the amount to swap based on the same percentage as the followed move
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
amount_to_swap = min( min(amount_to_swap, your_balance), 300)
amount_to_swap = min(amount_to_swap, your_balance) # should not happen
if token_name_in == 'USDC': # max 300
amount_to_swap = min(amount_to_swap, 300)
# # always get 99% of the amount to swap
# amount_to_swap = amount_to_swap * 0.95
@ -1033,7 +1144,7 @@ async def follow_move(move):
amount = int(amount_to_swap * 10**decimals)
if your_balance < amount_to_swap:
if your_balance < amount_to_swap: # should not happen
msg = (
f"<b>Warning:</b>\n"
f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway."
@ -1151,9 +1262,10 @@ SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
# except websockets.exceptions.ConnectionClosed:
# break
first_subscription = True
_first_subscription = True
_process_task = None
async def wallet_watch_loop():
global first_subscription
global first_subscription, process_task
reconnect_delay = 5
max_reconnect_delay = 60
@ -1173,19 +1285,21 @@ async def wallet_watch_loop():
subscription_id = await subscribe(websocket)
if subscription_id is not None:
await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
if first_subscription:
if _first_subscription:
asyncio.create_task( list_initial_wallet_states())
first_subscription = False
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
_first_subscription = False
_process_task = asyncio.create_task(process_messages(websocket, subscription_id))
while True:
try:
await asyncio.wait_for(process_task, timeout=SUBSCRIBE_INTERVAL)
try:# drop subscription now
await process_messages(websocket, subscription_id)
# await asyncio.run(_process_task)
# await asyncio.wait_for(_process_task, timeout=SUBSCRIBE_INTERVAL)
except asyncio.TimeoutError:
# Timeout occurred, time to resubscribe
if not PROCESSING_LOG:
process_task.cancel()
_process_task.cancel()
try:
await process_task
await _process_task
except asyncio.CancelledError:
pass
await unsubscribe(websocket, subscription_id)