Merge branch 'master' of https://git.d-popov.com/popov/ai-kevin - reset
This commit is contained in:
@@ -1,11 +1,12 @@
|
||||
import asyncio
|
||||
import uvicorn
|
||||
from asgiref.wsgi import WsgiToAsgi
|
||||
import websockets
|
||||
import json
|
||||
from flask import Flask, render_template, request, jsonify
|
||||
from solana.rpc.async_api import AsyncClient
|
||||
from solana.transaction import Signature
|
||||
from solana.rpc.websocket_api import connect
|
||||
from solana.rpc.types import TokenAccountOpts, TxOpts
|
||||
from solana.rpc.commitment import Confirmed, Processed
|
||||
from solana.transaction import Transaction
|
||||
from spl.token.client import Token
|
||||
@@ -24,8 +25,8 @@ from solders.instruction import CompiledInstruction
|
||||
from solders import message
|
||||
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
|
||||
from dexscreener import DexscreenerClient
|
||||
from telegram import Bot
|
||||
from telegram.constants import ParseMode
|
||||
from solana.rpc.types import TokenAccountOpts, TxOpts
|
||||
|
||||
import datetime
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
@@ -35,7 +36,6 @@ from dotenv import load_dotenv,set_key
|
||||
import aiohttp
|
||||
from typing import List, Dict
|
||||
import requests
|
||||
import threading
|
||||
import re
|
||||
from typing import List, Dict, Any, Tuple
|
||||
import random
|
||||
@@ -131,6 +131,7 @@ def get_latest_log_file():
|
||||
return None
|
||||
|
||||
# Flask route to retry processing the last log
|
||||
@app.route('/retry', methods=['GET'])
|
||||
@app.route('/retry-last-log', methods=['GET'])
|
||||
async def retry_last_log():
|
||||
latest_log_file = get_latest_log_file()
|
||||
@@ -155,9 +156,42 @@ async def retry_last_log():
|
||||
return jsonify({"error": "Failed to process log"}), 500
|
||||
|
||||
|
||||
#const webhookPath = `/tr/${followedWallet.toBase58()}/${logs.signature}`;
|
||||
@app.route('/tr/<wallet>/<tx_signature>', methods=['GET', 'POST'])
|
||||
async def transaction_notified(wallet, tx_signature):
|
||||
try:
|
||||
logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}")
|
||||
# Process the transaction
|
||||
# tr = await get_swap_transaction_details(tx_signature)
|
||||
tr = await get_transaction_details_info(tx_signature, [])
|
||||
# ToDo - probably optimize
|
||||
await get_token_metadata_symbol(tr['token_in'])
|
||||
await get_token_metadata_symbol(tr['token_out'])
|
||||
await follow_move(tr)
|
||||
await save_token_info()
|
||||
return jsonify(tr), 200
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing transaction: {e}")
|
||||
return jsonify({"error": "Failed to process transaction"}), 500
|
||||
|
||||
|
||||
|
||||
# Configuration
|
||||
DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
|
||||
FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
|
||||
YOUR_WALLET = os.getenv("YOUR_WALLET")
|
||||
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
||||
SOLANA_WS_URL = os.getenv("SOLANA_WS_URL")
|
||||
SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL")
|
||||
DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
|
||||
|
||||
|
||||
# Use the production Solana RPC endpoint
|
||||
solana_client = AsyncClient(SOLANA_HTTP_URL)
|
||||
dexscreener_client = DexscreenerClient()
|
||||
|
||||
|
||||
|
||||
# Create the bot with the custom connection pool
|
||||
bot = None
|
||||
# Token addresses (initialize with some known tokens)
|
||||
TOKEN_ADDRESSES = {
|
||||
"SOL": "So11111111111111111111111111111111111111112",
|
||||
@@ -173,208 +207,27 @@ except Exception as e:
|
||||
logging.error(f"Error loading token info: {str(e)}")
|
||||
|
||||
# # # # # # # # # # TELEGRAM # # # # # # # # # #
|
||||
async def send_telegram_message(message):
|
||||
if not telegram_utils.bot:
|
||||
try:
|
||||
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
|
||||
logging.info(f"Telegram message sent: {message}")
|
||||
# logging.info(f"Telegram message dummy sent: {message}")
|
||||
asyncio.run(telegram_utils.initialize())
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending Telegram message: {str(e)}")
|
||||
logging.error(f"Error initializing Telegram bot: {str(e)}")
|
||||
# async def telegram_utils.send_telegram_message(message):
|
||||
# try:
|
||||
# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
|
||||
# logging.info(f"Telegram message sent: {message}")
|
||||
# # logging.info(f"Telegram message dummy sent: {message}")
|
||||
# except Exception as e:
|
||||
# logging.error(f"Error sending Telegram message: {str(e)}")
|
||||
|
||||
|
||||
|
||||
# # # # # # # # # # DATABASE # # # # # # # # # #
|
||||
|
||||
|
||||
|
||||
# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # #
|
||||
|
||||
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
|
||||
global TOKENS_INFO
|
||||
|
||||
# Skip for USD
|
||||
prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"}
|
||||
remaining_tokens = [addr for addr in token_addresses if addr not in prices]
|
||||
|
||||
# Try CoinGecko
|
||||
coingecko_prices = await get_prices_from_coingecko(remaining_tokens)
|
||||
prices.update(coingecko_prices)
|
||||
|
||||
|
||||
# For remaining missing tokens, try Jupiter
|
||||
missing_tokens = set(remaining_tokens) - set(prices.keys())
|
||||
if missing_tokens:
|
||||
jupiter_prices = await get_prices_from_jupiter(list(missing_tokens))
|
||||
prices.update(jupiter_prices)
|
||||
|
||||
|
||||
# For tokens not found in CoinGecko, use DexScreener
|
||||
missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys())
|
||||
if missing_tokens:
|
||||
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
|
||||
prices.update(dexscreener_prices)
|
||||
|
||||
# For remaining missing tokens, try Raydium
|
||||
missing_tokens = set(remaining_tokens) - set(prices.keys())
|
||||
if missing_tokens:
|
||||
raydium_prices = await get_prices_from_raydium(list(missing_tokens))
|
||||
prices.update(raydium_prices)
|
||||
|
||||
# For remaining missing tokens, try Orca
|
||||
missing_tokens = set(remaining_tokens) - set(prices.keys())
|
||||
if missing_tokens:
|
||||
orca_prices = await get_prices_from_orca(list(missing_tokens))
|
||||
prices.update(orca_prices)
|
||||
|
||||
# If any tokens are still missing, set their prices to 0
|
||||
for token in set(token_addresses) - set(prices.keys()):
|
||||
prices[token] = 0.0
|
||||
logging.warning(f"Price not found for token {token}. Setting to 0.")
|
||||
|
||||
for token, price in prices.items():
|
||||
token_info = TOKENS_INFO.setdefault(token, {})
|
||||
if 'symbol' not in token_info:
|
||||
token_info['symbol'] = await get_token_metadata_symbol(token)
|
||||
token_info['price'] = price
|
||||
|
||||
return prices
|
||||
|
||||
|
||||
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
|
||||
base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
|
||||
prices = {}
|
||||
|
||||
async def fetch_single_price(session, address):
|
||||
params = {
|
||||
"contract_addresses": address,
|
||||
"vs_currencies": DISPLAY_CURRENCY.lower()
|
||||
}
|
||||
try:
|
||||
async with session.get(base_url, params=params) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
if address in data and DISPLAY_CURRENCY.lower() in data[address]:
|
||||
return address, data[address][DISPLAY_CURRENCY.lower()]
|
||||
else:
|
||||
logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}")
|
||||
return address, None
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = [fetch_single_price(session, address) for address in token_addresses]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
for address, price in results:
|
||||
if price is not None:
|
||||
prices[address] = price
|
||||
|
||||
return prices
|
||||
|
||||
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
|
||||
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
|
||||
prices = {}
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
for address, result in zip(token_addresses, results):
|
||||
if result and 'pairs' in result and result['pairs']:
|
||||
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
|
||||
prices[address] = float(pair['priceUsd'])
|
||||
else:
|
||||
logging.warning(f"No price data found on DexScreener for token {address}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching token prices from DexScreener: {str(e)}")
|
||||
|
||||
return prices
|
||||
|
||||
async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]:
|
||||
url = "https://price.jup.ag/v4/price"
|
||||
params = {
|
||||
"ids": ",".join(token_addresses)
|
||||
}
|
||||
prices = {}
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url, params=params) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
for address, price_info in data.get('data', {}).items():
|
||||
if 'price' in price_info:
|
||||
prices[address] = float(price_info['price'])
|
||||
else:
|
||||
logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching token prices from Jupiter: {str(e)}")
|
||||
return prices
|
||||
|
||||
# New function for Raydium
|
||||
async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]:
|
||||
url = "https://api.raydium.io/v2/main/price"
|
||||
prices = {}
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
for address in token_addresses:
|
||||
if address in data:
|
||||
prices[address] = float(data[address])
|
||||
else:
|
||||
logging.error(f"Failed to get token prices from Raydium. Status: {response.status}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching token prices from Raydium: {str(e)}")
|
||||
return prices
|
||||
|
||||
# New function for Orca
|
||||
async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]:
|
||||
url = "https://api.orca.so/allTokens"
|
||||
prices = {}
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
for token_info in data:
|
||||
if token_info['mint'] in token_addresses:
|
||||
prices[token_info['mint']] = float(token_info['price'])
|
||||
else:
|
||||
logging.error(f"Failed to get token prices from Orca. Status: {response.status}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching token prices from Orca: {str(e)}")
|
||||
return prices
|
||||
|
||||
|
||||
async def fetch_token_data(session, url):
|
||||
try:
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
return await response.json()
|
||||
else:
|
||||
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching data from {url}: {str(e)}")
|
||||
return None
|
||||
|
||||
async def get_sol_price() -> float:
|
||||
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
return data['solana'][DISPLAY_CURRENCY.lower()]
|
||||
else:
|
||||
logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
|
||||
return await get_sol_price_from_dexscreener()
|
||||
|
||||
async def get_sol_price_from_dexscreener() -> float:
|
||||
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
|
||||
prices = await get_prices_from_dexscreener([sol_address])
|
||||
return prices.get(sol_address, 0.0)
|
||||
|
||||
|
||||
# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # #
|
||||
|
||||
@@ -439,44 +292,9 @@ from spl.token.async_client import AsyncToken
|
||||
from spl.token.constants import TOKEN_PROGRAM_ID
|
||||
from borsh_construct import String, CStruct
|
||||
|
||||
async def get_token_metadata_symbol(mint_address):
|
||||
global TOKENS_INFO
|
||||
|
||||
|
||||
|
||||
if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]:
|
||||
return TOKENS_INFO[mint_address].get('symbol')
|
||||
|
||||
try:
|
||||
account_data_result = await solana_jsonrpc("getAccountInfo", mint_address)
|
||||
if 'value' in account_data_result and 'data' in account_data_result['value']:
|
||||
account_data_data = account_data_result['value']['data']
|
||||
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
|
||||
account_data_info = account_data_data['parsed']['info']
|
||||
if 'decimals' in account_data_info:
|
||||
if mint_address in TOKENS_INFO:
|
||||
TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
|
||||
else:
|
||||
TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
|
||||
if 'tokenName' in account_data_info:
|
||||
if mint_address in TOKENS_INFO:
|
||||
TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
|
||||
else:
|
||||
TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
|
||||
|
||||
metadata = await get_token_metadata(mint_address)
|
||||
if metadata:
|
||||
if mint_address in TOKENS_INFO:
|
||||
TOKENS_INFO[mint_address].update(metadata)
|
||||
else:
|
||||
TOKENS_INFO[mint_address] = metadata
|
||||
await save_token_info()
|
||||
# TOKENS_INFO[mint_address] = metadata
|
||||
# return metadata.get('symbol') or metadata.get('name')
|
||||
return TOKENS_INFO[mint_address].get('symbol')
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
|
||||
return None
|
||||
|
||||
|
||||
METADATA_STRUCT = CStruct(
|
||||
"update_authority" / String,
|
||||
"mint" / String,
|
||||
@@ -610,7 +428,7 @@ async def get_wallet_balances(wallet_address, doGetTokenName=True):
|
||||
# sleep for 1 second to avoid rate limiting
|
||||
await asyncio.sleep(2)
|
||||
|
||||
TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals)
|
||||
TOKENS_INFO[mint]['holdedAmount'] = round(amount, decimals)
|
||||
TOKENS_INFO[mint]['decimals'] = decimals
|
||||
balances[mint] = {
|
||||
'name': token_name or 'N/A',
|
||||
@@ -697,244 +515,10 @@ async def get_swap_transaction_details(tx_signature_str):
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# # # RAW Solana API RPC # # #
|
||||
|
||||
#this is the meat of the application
|
||||
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
|
||||
global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO
|
||||
try:
|
||||
if readfromDump and os.path.exists('./logs/transation_details.json'):
|
||||
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
|
||||
transaction_details = json.load(f)
|
||||
return transaction_details
|
||||
else:
|
||||
transaction_details = await solana_jsonrpc("getTransaction", tx_signature)
|
||||
with open('./logs/transation_details.json', 'w') as f:
|
||||
json.dump(transaction_details, f, indent=2)
|
||||
|
||||
if transaction_details is None:
|
||||
logging.error(f"Error fetching transaction details for {tx_signature}")
|
||||
return None
|
||||
|
||||
# Initialize default result structure
|
||||
parsed_result = {
|
||||
"order_id": None,
|
||||
"token_in": None,
|
||||
"token_out": None,
|
||||
"amount_in": 0,
|
||||
"amount_out": 0,
|
||||
"amount_in_USD": 0,
|
||||
"amount_out_USD": 0,
|
||||
"percentage_swapped": 0
|
||||
}
|
||||
|
||||
# Extract order_id from logs
|
||||
log_messages = transaction_details.get("meta", {}).get("logMessages", [])
|
||||
for log in log_messages:
|
||||
if "order_id" in log:
|
||||
parsed_result["order_id"] = log.split(":")[2].strip()
|
||||
break
|
||||
|
||||
# Extract token transfers from innerInstructions
|
||||
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
|
||||
for instruction_set in inner_instructions:
|
||||
for instruction in instruction_set.get('instructions', []):
|
||||
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked':
|
||||
info = instruction['parsed']['info']
|
||||
mint = info['mint']
|
||||
amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals
|
||||
|
||||
# Determine which token is being swapped in and out based on zero balances
|
||||
if parsed_result["token_in"] is None and amount > 0:
|
||||
parsed_result["token_in"] = mint
|
||||
parsed_result["amount_in"] = amount
|
||||
|
||||
|
||||
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
|
||||
# if we've failed to extract token_in and token_out from the transaction details, try a second method
|
||||
inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', [])
|
||||
transfers = []
|
||||
|
||||
for instruction_set in inner_instructions:
|
||||
for instruction in instruction_set.get('instructions', []):
|
||||
if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']:
|
||||
info = instruction['parsed']['info']
|
||||
amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount'])
|
||||
decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0
|
||||
adjusted_amount = amount / (10 ** decimals)
|
||||
# adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0))
|
||||
transfers.append({
|
||||
'mint': info.get('mint'),
|
||||
'amount': adjusted_amount,
|
||||
'source': info['source'],
|
||||
'destination': info['destination']
|
||||
})
|
||||
|
||||
# Identify token_in and token_out
|
||||
if len(transfers) >= 2:
|
||||
parsed_result["token_in"] = transfers[0]['mint']
|
||||
parsed_result["amount_in"] = transfers[0]['amount']
|
||||
parsed_result["token_out"] = transfers[-1]['mint']
|
||||
parsed_result["amount_out"] = transfers[-1]['amount']
|
||||
|
||||
# If mint is not provided, query the Solana network for the account data
|
||||
if parsed_result["token_in"] is None or parsed_result["token_out"] is None:
|
||||
#for transfer in transfers:
|
||||
# do only first and last transfer
|
||||
for transfer in [transfers[0], transfers[-1]]:
|
||||
if transfer['mint'] is None:
|
||||
# Query the Solana network for the account data
|
||||
account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source'])
|
||||
|
||||
if 'value' in account_data_result and 'data' in account_data_result['value']:
|
||||
account_data_value = account_data_result['value']
|
||||
account_data_data = account_data_value['data']
|
||||
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
|
||||
account_data_info = account_data_data['parsed']['info']
|
||||
if 'mint' in account_data_info:
|
||||
transfer['mint'] = account_data_info['mint']
|
||||
if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
|
||||
await get_token_metadata_symbol(transfer['mint'])
|
||||
# get actual prices
|
||||
current_price = await get_token_prices([transfer['mint']])
|
||||
|
||||
if parsed_result["token_in"] is None:
|
||||
parsed_result["token_in"] = transfer['mint']
|
||||
parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol']
|
||||
parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
|
||||
parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
|
||||
elif parsed_result["token_out"] is None:
|
||||
parsed_result["token_out"] = transfer['mint']
|
||||
parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol']
|
||||
parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
|
||||
parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price']
|
||||
|
||||
pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
|
||||
for balance in pre_balalnces:
|
||||
if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET:
|
||||
parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals']
|
||||
break
|
||||
|
||||
|
||||
# Calculate percentage swapped
|
||||
try:
|
||||
if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0:
|
||||
parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
|
||||
else:
|
||||
# calculate based on total wallet value: FOLLOWED_WALLET_VALUE
|
||||
parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100
|
||||
except Exception as e:
|
||||
logging.error(f"Error calculating percentage swapped: {e}")
|
||||
|
||||
return parsed_result
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print("Error fetching transaction details:", e)
|
||||
|
||||
|
||||
async def solana_jsonrpc(method, params = None, jsonParsed = True):
|
||||
# target json example:
|
||||
# data = {
|
||||
# "jsonrpc": "2.0",
|
||||
# "id": 1,
|
||||
# "method": "getTransaction",
|
||||
# "params": [
|
||||
# tx_signature,
|
||||
# {
|
||||
# "encoding": "jsonParsed",
|
||||
# "maxSupportedTransactionVersion": 0
|
||||
# }
|
||||
# ]
|
||||
# }
|
||||
# if param is not array, make it array
|
||||
if not isinstance(params, list):
|
||||
params = [params]
|
||||
|
||||
data = {
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": method,
|
||||
"params": params or []
|
||||
}
|
||||
data["params"].append({"maxSupportedTransactionVersion": 0})
|
||||
if jsonParsed:
|
||||
data["params"][1]["encoding"] = "jsonParsed"
|
||||
|
||||
|
||||
try:
|
||||
# url = 'https://solana.drpc.org'
|
||||
response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
|
||||
response.raise_for_status() # Raises an error for bad responses
|
||||
result = response.json()
|
||||
if not 'result' in result or 'error' in result:
|
||||
print("Error fetching data from Solana RPC:", result)
|
||||
return None
|
||||
return result['result']
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching data from Solana RPC: {e}")
|
||||
return None
|
||||
|
||||
|
||||
|
||||
# # # # # # # # # # Functionality # # # # # # # # # #
|
||||
|
||||
|
||||
async def list_initial_wallet_states():
|
||||
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES
|
||||
global TOKENS_INFO # new
|
||||
|
||||
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
|
||||
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
|
||||
|
||||
all_token_addresses = list(set(followed_wallet_balances.keys()) |
|
||||
set(your_wallet_balances.keys()) |
|
||||
set(TOKEN_ADDRESSES.values()))
|
||||
|
||||
TOKEN_PRICES = await get_token_prices(all_token_addresses)
|
||||
sol_price = await get_sol_price()
|
||||
|
||||
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price)
|
||||
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price)
|
||||
|
||||
|
||||
TOKEN_ADDRESSES = {
|
||||
address: info for address,
|
||||
info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0
|
||||
}
|
||||
logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}")
|
||||
|
||||
followed_wallet_state = []
|
||||
FOLLOWED_WALLET_VALUE = 0
|
||||
for address, info in followed_converted_balances.items():
|
||||
if info['value'] is not None and info['value'] > 0:
|
||||
followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})")
|
||||
FOLLOWED_WALLET_VALUE += info['value']
|
||||
|
||||
your_wallet_state = []
|
||||
YOUR_WALLET_VALUE = 0
|
||||
for address, info in your_converted_balances.items():
|
||||
if info['value'] is not None and info['value'] > 0:
|
||||
your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}")
|
||||
YOUR_WALLET_VALUE += info['value']
|
||||
|
||||
message = (
|
||||
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
|
||||
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
|
||||
f"{chr(10).join(followed_wallet_state)}\n"
|
||||
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
||||
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
|
||||
f"{chr(10).join(your_wallet_state)}\n"
|
||||
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
|
||||
f"<b>Monitored Tokens:</b>\n"
|
||||
f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}"
|
||||
)
|
||||
|
||||
logging.info(message)
|
||||
await send_telegram_message(message)
|
||||
|
||||
# save token info to file
|
||||
await save_token_info()
|
||||
|
||||
def safe_get_property(info, property_name, default='Unknown'):
|
||||
if not isinstance(info, dict):
|
||||
@@ -952,12 +536,13 @@ async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, ma
|
||||
# query every 5 seconds for the transaction details until not None or 30 seconds
|
||||
for _ in range(max_retries):
|
||||
try:
|
||||
tx_details = await get_transaction_details_rpc(transaction_id)
|
||||
tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id)
|
||||
if tx_details is not None:
|
||||
break
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching transaction details for '{transaction_id}': {e}")
|
||||
logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}")
|
||||
logging.error(f"Error fetching transaction details: {e}")
|
||||
retry_delay = retry_delay * 1.2
|
||||
logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}. retry in {retry_delay} s.")
|
||||
await asyncio.sleep(retry_delay)
|
||||
retry_delay *= 1.2
|
||||
return tx_details
|
||||
@@ -1006,7 +591,7 @@ async def process_log(log_result):
|
||||
|
||||
before_source_balance = 0
|
||||
source_token_change = 0
|
||||
|
||||
|
||||
i = 0
|
||||
while i < len(logs):
|
||||
log_entry = logs[i]
|
||||
@@ -1029,7 +614,7 @@ async def process_log(log_result):
|
||||
|
||||
i += 1
|
||||
|
||||
# calculatte percentage swapped by digging before_source_balance, source_token_change and after_source_balance
|
||||
# calculate percentage swapped by digging before_source_balance, source_token_change and after_source_balance
|
||||
|
||||
# "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752",
|
||||
# "Program log: after_source_balance: 0, after_destination_balance: 770570049",
|
||||
@@ -1080,14 +665,15 @@ async def process_log(log_result):
|
||||
f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " # ({tr_details['token_in']}) ({tr_details['token_out']})
|
||||
f"{tr_details['symbol_out']} \n"
|
||||
)
|
||||
await send_telegram_message(message_text)
|
||||
await telegram_utils.send_telegram_message(message_text)
|
||||
await follow_move(tr_details)
|
||||
await save_token_info()
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error aquiring log details and following: {e}")
|
||||
error_logger.info(f"Error aquiring log details and following:\n {tr_details}")
|
||||
await send_telegram_message(f"Not followed! Error following move.")
|
||||
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing log: {e}")
|
||||
@@ -1095,6 +681,7 @@ async def process_log(log_result):
|
||||
|
||||
PROCESSING_LOG = False
|
||||
return tr_details
|
||||
|
||||
# "Program log: Instruction: Swap2",
|
||||
# "Program log: order_id: 13985890735038016",
|
||||
# "Program log: AbrMJWfDVRZ2EWCQ1xSCpoVeVgZNpq1U2AoYG98oRXfn", source
|
||||
@@ -1136,14 +723,13 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float:
|
||||
|
||||
|
||||
async def follow_move(move):
|
||||
tx_details = None
|
||||
your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
|
||||
if your_balance_info is not None:
|
||||
# Use the balance
|
||||
print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
|
||||
else:
|
||||
print("No ballance found for {move['symbol_in']}. Skipping move.")
|
||||
print(f"No ballance found for {move['symbol_in']}. Skipping move.")
|
||||
await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
|
||||
return
|
||||
|
||||
@@ -1152,12 +738,12 @@ async def follow_move(move):
|
||||
|
||||
token_info = TOKENS_INFO.get(move['token_in'])
|
||||
token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in'])
|
||||
token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await get_token_metadata_symbol(move['token_out'])
|
||||
token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out'])
|
||||
|
||||
if not your_balance:
|
||||
msg = f"<b>Move not followed:</b>\nNo balance found for token {move['symbol_in']}. Cannot follow move."
|
||||
logging.warning(msg)
|
||||
await send_telegram_message(msg)
|
||||
await telegram_utils.send_telegram_message(msg)
|
||||
return
|
||||
|
||||
if FOLLOW_AMOUNT == 'percentage':
|
||||
@@ -1176,15 +762,13 @@ async def follow_move(move):
|
||||
return
|
||||
|
||||
amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
|
||||
|
||||
amount = int(amount)
|
||||
logging.debug(f"Calculated amount in lamports: {amount}")
|
||||
|
||||
decimals = token_info.get('decimals')
|
||||
# Convert to lamports
|
||||
# if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9
|
||||
amount = int(amount_to_swap * 10**decimals)
|
||||
|
||||
amount = int(amount)
|
||||
logging.debug(f"Calculated amount in lamports: {amount}")
|
||||
|
||||
if your_balance < amount_to_swap: # should not happen
|
||||
msg = (
|
||||
@@ -1192,7 +776,7 @@ async def follow_move(move):
|
||||
f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway."
|
||||
)
|
||||
logging.warning(msg)
|
||||
await send_telegram_message(msg)
|
||||
await telegram_utils.send_telegram_message(msg)
|
||||
try:
|
||||
|
||||
try:
|
||||
@@ -1202,7 +786,7 @@ async def follow_move(move):
|
||||
)
|
||||
# logging.info(notification)
|
||||
# error_logger.info(notification)
|
||||
# await send_telegram_message(notification)
|
||||
# await telegram_utils.send_telegram_message(notification)
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending notification: {e}")
|
||||
|
||||
@@ -1215,7 +799,7 @@ async def follow_move(move):
|
||||
input_mint=move['token_in'],
|
||||
output_mint=move['token_out'],
|
||||
amount=amount,
|
||||
slippage_bps=100, # Increased to 1%
|
||||
slippage_bps=300, # Increased to 3%
|
||||
)
|
||||
logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
|
||||
error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}")
|
||||
@@ -1232,7 +816,7 @@ async def follow_move(move):
|
||||
# append to notification
|
||||
notification += f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
|
||||
|
||||
await send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
|
||||
await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
|
||||
tx_details = await get_transaction_details_with_retry(transaction_id)
|
||||
|
||||
if tx_details is not None:
|
||||
@@ -1246,7 +830,7 @@ async def follow_move(move):
|
||||
# log the errors to /logs/errors.log
|
||||
error_logger.error(error_message)
|
||||
error_logger.exception(e)
|
||||
await send_telegram_message(error_message)
|
||||
await telegram_utils.send_telegram_message(error_message)
|
||||
amount = amount * 0.75
|
||||
|
||||
await get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
@@ -1273,11 +857,6 @@ async def follow_move(move):
|
||||
)
|
||||
logging.info(notification)
|
||||
await send_telegram_message(notification)
|
||||
|
||||
# Log successful swap details
|
||||
success_logger_accounting_csv.info(
|
||||
f"{move['symbol_in']},{move['symbol_out']},{amount_to_swap:.6f},{tx_details['amount_out']:.6f},{move['amount_in_USD']:.2f},{tx_details['amount_out_USD']:.2f},{move['percentage_swapped']:.2f}"
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending notification: {e}")
|
||||
|
||||
@@ -1289,9 +868,9 @@ async def follow_move(move):
|
||||
error_logger.exception(e) \
|
||||
# if error_message contains 'Program log: Error: insufficient funds'
|
||||
if 'insufficient funds' in error_message:
|
||||
await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
|
||||
await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
|
||||
else:
|
||||
await send_telegram_message(error_message)
|
||||
await telegram_utils.send_telegram_message(error_message)
|
||||
|
||||
|
||||
# Helper functions
|
||||
@@ -1302,7 +881,7 @@ SOLANA_ENDPOINTS = [
|
||||
# "wss://mainnet.rpcpool.com",
|
||||
]
|
||||
PING_INTERVAL = 30
|
||||
SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
|
||||
SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes
|
||||
|
||||
|
||||
# async def heartbeat(websocket):
|
||||
@@ -1335,11 +914,11 @@ async def wallet_watch_loop():
|
||||
|
||||
subscription_id = await subscribe(websocket)
|
||||
if subscription_id is not None:
|
||||
await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
|
||||
# await send_telegram_message(f"Solana mainnet connected ({subscription_id})...")
|
||||
if _first_subscription:
|
||||
asyncio.create_task( list_initial_wallet_states())
|
||||
_first_subscription = False
|
||||
_process_task = asyncio.create_task(process_messages(websocket, subscription_id))
|
||||
_process_task = asyncio.create_task(process_messages(websocket))
|
||||
while True:
|
||||
try:# drop subscription now
|
||||
await process_messages(websocket, subscription_id)
|
||||
@@ -1363,7 +942,7 @@ async def wallet_watch_loop():
|
||||
# Already subscribed
|
||||
logger.info("Already subscribed, continuing with existing subscription")
|
||||
if subscription_id:
|
||||
process_task = asyncio.create_task(process_messages(websocket, subscription_id))
|
||||
process_task = asyncio.create_task(process_messages(websocket))
|
||||
|
||||
else:
|
||||
# process_messages completed (shouldn't happen unless there's an error)
|
||||
@@ -1379,9 +958,9 @@ async def wallet_watch_loop():
|
||||
logger.error(f"An unexpected error occurred: {e}")
|
||||
|
||||
await unsubscribe(websocket, subscription_id)
|
||||
# await send_telegram_message("reconnecting...")
|
||||
await send_telegram_message("reconnecting...")
|
||||
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
|
||||
websocket.close()
|
||||
await websocket.close()
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred - breaking watch loop: {e}")
|
||||
|
||||
@@ -1401,22 +980,7 @@ async def subscribe(websocket):
|
||||
try:
|
||||
await websocket.send(json.dumps(request))
|
||||
logger.info("Subscription request sent")
|
||||
|
||||
response = await websocket.recv()
|
||||
response_data = json.loads(response)
|
||||
|
||||
if 'result' in response_data:
|
||||
subscription_id = response_data['result']
|
||||
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
|
||||
return subscription_id
|
||||
else:
|
||||
logger.warning(f"Unexpected response: {response_data}")
|
||||
return None
|
||||
except websockets.exceptions.ConnectionClosedError as e:
|
||||
logger.error(f"Connection closed unexpectedly: {e}")
|
||||
# await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
|
||||
await websocket.close()
|
||||
return None
|
||||
return await process_messages(websocket)
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred: {e}")
|
||||
return None
|
||||
@@ -1433,14 +997,23 @@ async def unsubscribe(websocket, subscription_id):
|
||||
logger.info(f"Unsubscribed from subscription id: {subscription_id}")
|
||||
subscription_id = None
|
||||
|
||||
async def process_messages(websocket, subscription_id):
|
||||
async def process_messages(websocket):
|
||||
try:
|
||||
while True:
|
||||
response = await websocket.recv()
|
||||
response_data = json.loads(response)
|
||||
logger.debug(f"Received response: {response_data}")
|
||||
|
||||
if 'params' in response_data:
|
||||
if 'result' in response_data:
|
||||
new_sub_id = response_data['result']
|
||||
if int(new_sub_id) > 1:
|
||||
subscription_id = new_sub_id
|
||||
logger.info(f"Subscription successful. New id: {subscription_id}")
|
||||
elif new_sub_id:
|
||||
logger.info(f"Existing subscription confirmed: {subscription_id}")
|
||||
else: return None
|
||||
return subscription_id
|
||||
elif 'params' in response_data:
|
||||
log = response_data['params']['result']
|
||||
logger.debug(f"Received transaction log: {log}")
|
||||
asyncio.create_task(process_log(log))
|
||||
@@ -1449,7 +1022,7 @@ async def process_messages(websocket, subscription_id):
|
||||
|
||||
except websockets.exceptions.ConnectionClosedError as e:
|
||||
logger.error(f"Connection closed unexpectedly: {e}")
|
||||
await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
|
||||
# await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
|
||||
pass
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Failed to decode JSON: {e}")
|
||||
@@ -1476,56 +1049,77 @@ async def check_PK():
|
||||
if not pk:
|
||||
logging.error("Private key not found in environment variables. Will not be able to sign transactions.")
|
||||
# send TG warning message
|
||||
await send_telegram_message("<b>Warning:</b> Private key not found in environment variables. Will not be able to sign transactions.")
|
||||
await telegram_utils.send_telegram_message("<b>Warning:</b> Private key not found in environment variables. Will not be able to sign transactions.")
|
||||
|
||||
|
||||
|
||||
# Convert Flask app to ASGI
|
||||
asgi_app = WsgiToAsgi(app)
|
||||
solanaAPI = SolanaAPI()
|
||||
|
||||
async def main():
|
||||
global bot, PROCESSING_LOG
|
||||
# Initialize Telegram Bot
|
||||
# Create a custom connection pool
|
||||
conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit
|
||||
timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout
|
||||
global solanaAPI, bot, PROCESSING_LOG
|
||||
|
||||
bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request)
|
||||
|
||||
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
||||
telegram_utils.initialize()
|
||||
await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
||||
await check_PK()
|
||||
# new: restart wallet_watch_loop every hour
|
||||
while True:
|
||||
wallet_watch_task = asyncio.create_task(wallet_watch_loop())
|
||||
await solanaAPI.wallet_watch_loop()
|
||||
|
||||
# while True:
|
||||
# wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop())
|
||||
|
||||
try:
|
||||
# Wait for an hour or until the task completes, whichever comes first
|
||||
await asyncio.wait_for(wallet_watch_task, timeout=3600)
|
||||
except asyncio.TimeoutError:
|
||||
# If an hour has passed, cancel the task if not PROCESSING
|
||||
if PROCESSING_LOG:
|
||||
logging.info("wallet_watch_loop is processing logs. Will not restart.")
|
||||
await send_telegram_message("wallet_watch_loop is processing logs. Will not restart.")
|
||||
else:
|
||||
wallet_watch_task.cancel()
|
||||
try:
|
||||
await wallet_watch_task
|
||||
except asyncio.CancelledError:
|
||||
logging.info("wallet_watch_loop was cancelled after running for an hour")
|
||||
except Exception as e:
|
||||
logging.error(f"Error in wallet_watch_loop: {str(e)}")
|
||||
await send_telegram_message(f"Error in wallet_watch_loop: {str(e)}")
|
||||
# try:
|
||||
# # Wait for an hour or until the task completes, whichever comes first
|
||||
# await asyncio.wait_for(wallet_watch_task, timeout=3600)
|
||||
# except asyncio.TimeoutError:
|
||||
# # If an hour has passed, cancel the task if not PROCESSING
|
||||
# if PROCESSING_LOG:
|
||||
# logging.info("wallet_watch_loop is processing logs. Will not restart.")
|
||||
# await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.")
|
||||
# else:
|
||||
# wallet_watch_task.cancel()
|
||||
# try:
|
||||
# await wallet_watch_task
|
||||
# except asyncio.CancelledError:
|
||||
# logging.info("wallet_watch_loop was cancelled after running for an hour")
|
||||
# except Exception as e:
|
||||
# logging.error(f"Error in wallet_watch_loop: {str(e)}")
|
||||
# await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}")
|
||||
|
||||
logging.info("Restarting wallet_watch_loop")
|
||||
await send_telegram_message("Restarting wallet_watch_loop")
|
||||
# logging.info("Restarting wallet_watch_loop")
|
||||
# await telegram_utils.send_telegram_message("Restarting wallet_watch_loop")
|
||||
|
||||
async def run_flask():
|
||||
loop = asyncio.get_running_loop()
|
||||
await loop.run_in_executor(None, lambda: app.run(debug=False, port=3001, use_reloader=False))
|
||||
|
||||
|
||||
def run_asyncio_loop(loop):
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_forever()
|
||||
|
||||
async def run_all():
|
||||
await asyncio.gather(
|
||||
main(),
|
||||
run_flask()
|
||||
)
|
||||
main_task = asyncio.create_task(main())
|
||||
await main_task
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(run_all())
|
||||
# Create a new event loop
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
# Start the asyncio loop in a separate thread
|
||||
thread = Thread(target=run_asyncio_loop, args=(loop,))
|
||||
thread.start()
|
||||
|
||||
# Schedule the run_all coroutine in the event loop
|
||||
asyncio.run_coroutine_threadsafe(run_all(), loop)
|
||||
|
||||
# Run Uvicorn in the main thread
|
||||
uvicorn.run(
|
||||
"app:asgi_app", # Replace 'app' with the actual name of this Python file if different
|
||||
host="127.0.0.1",
|
||||
port=3001,
|
||||
log_level="debug",
|
||||
reload=True
|
||||
)
|
||||
|
||||
# When Uvicorn exits, stop the asyncio loop
|
||||
loop.call_soon_threadsafe(loop.stop)
|
||||
thread.join()
|
||||
Reference in New Issue
Block a user