453 lines
19 KiB
Python
453 lines
19 KiB
Python
import asyncio
|
|
import uvicorn
|
|
from asgiref.wsgi import WsgiToAsgi
|
|
import websockets
|
|
import json
|
|
|
|
import datetime
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
from threading import Thread
|
|
from solana.rpc.async_api import AsyncClient
|
|
from solders.transaction import VersionedTransaction
|
|
from solana.rpc.types import TxOpts
|
|
from solders.keypair import Keypair
|
|
from jupiter_python_sdk.jupiter import Jupiter
|
|
from solana.rpc.commitment import Processed
|
|
|
|
from modules.webui import init_app
|
|
from modules.storage import init_db, store_transaction
|
|
from modules.utils import telegram_utils, logging, get_pk
|
|
from modules.log_processor import watch_for_new_logs
|
|
from modules.SolanaAPI import SAPI
|
|
|
|
|
|
# config = load_config()
|
|
load_dotenv()
|
|
load_dotenv('.env.secret')
|
|
# Configuration
|
|
|
|
|
|
from config import (DO_WATCH_WALLET, logging, logger)
|
|
|
|
|
|
|
|
|
|
# # # # # # # # # # TELEGRAM # # # # # # # # # #
|
|
# if not telegram_utils.bot:
|
|
# try:
|
|
# asyncio.run(telegram_utils.initialize())
|
|
# except Exception as e:
|
|
# logging.error(f"Error initializing Telegram bot: {str(e)}")
|
|
# async def telegram_utils.send_telegram_message(message):
|
|
# try:
|
|
# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML)
|
|
# logging.info(f"Telegram message sent: {message}")
|
|
# # logging.info(f"Telegram message dummy sent: {message}")
|
|
# except Exception as e:
|
|
# logging.error(f"Error sending Telegram message: {str(e)}")
|
|
|
|
|
|
|
|
# # # # # # # # # # DATABASE # # # # # # # # # #
|
|
|
|
|
|
|
|
# # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # #
|
|
|
|
|
|
# # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # #
|
|
|
|
|
|
|
|
|
|
|
|
# # # # # # # # # # Functionality # # # # # # # # # #
|
|
|
|
|
|
|
|
|
|
|
|
async def save_log(log):
|
|
try:
|
|
os.makedirs('./logs', exist_ok=True)
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
|
|
filename = f"./logs/log_{timestamp}.json"
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(log, f, indent=2)
|
|
except Exception as e:
|
|
logging.error(f"Error saving RPC log: {e}")
|
|
|
|
|
|
PROCESSING_LOG = False
|
|
async def process_log(log_result):
|
|
global PROCESSING_LOG
|
|
tr_details = {
|
|
"order_id": None,
|
|
"token_in": None,
|
|
"token_out": None,
|
|
"amount_in": 0,
|
|
"amount_out": 0,
|
|
"amount_in_USD": 0,
|
|
"amount_out_USD": 0,
|
|
"percentage_swapped": 0
|
|
}
|
|
|
|
if log_result['value']['err']:
|
|
return
|
|
|
|
logs = log_result['value']['logs']
|
|
try:
|
|
# Detect swap operations in logs
|
|
PROCESSING_LOG = True
|
|
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2', 'Program log: Instruction: SwapExactAmountIn', 'Program log: Instruction: SwapV2']
|
|
|
|
|
|
if any(op in logs for op in swap_operations):
|
|
# Save the log to a file
|
|
await save_log(log_result)
|
|
tx_signature_str = log_result['value']['signature']
|
|
|
|
before_source_balance = 0
|
|
source_token_change = 0
|
|
|
|
i = 0
|
|
while i < len(logs):
|
|
log_entry = logs[i]
|
|
|
|
# Check if we found the 'order_id'
|
|
if tr_details["order_id"] is None and "order_id" in log_entry:
|
|
# Extract the order_id
|
|
tr_details["order_id"] = log_entry.split(":")[-1].strip()
|
|
tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
|
|
tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
|
|
|
|
# Look for the token change amounts after tokens have been found
|
|
if "source_token_change" in log_entry:
|
|
parts = log_entry.split(", ")
|
|
for part in parts:
|
|
if "source_token_change" in part:
|
|
tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals
|
|
elif "destination_token_change" in part:
|
|
tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6 # Assuming 6 decimals
|
|
|
|
i += 1
|
|
|
|
# calculate percentage swapped by digging before_source_balance, source_token_change and after_source_balance
|
|
|
|
# "Program log: before_source_balance: 19471871, before_destination_balance: 0, amount_in: 19471871, expect_amount_out: 770877527, min_return: 763168752",
|
|
# "Program log: after_source_balance: 0, after_destination_balance: 770570049",
|
|
# "Program log: source_token_change: 19471871, destination_token_change: 770570049",
|
|
if "before_source_balance" in log_entry:
|
|
parts = log_entry.split(", ")
|
|
for part in parts:
|
|
if "before_source_balance" in part:
|
|
before_source_balance = float(part.split(":")[-1].strip()) / 10 ** 6
|
|
if "source_token_change" in log_entry:
|
|
parts = log_entry.split(", ")
|
|
for part in parts:
|
|
if "source_token_change" in part:
|
|
source_token_change = float(part.split(":")[-1].strip()) / 10 ** 6
|
|
|
|
|
|
# GET DETAILS FROM TRANSACTION IF NOT FOUND IN LOGS
|
|
try:
|
|
if tr_details["token_in"] is None or tr_details["token_out"] is None or tr_details["amount_in"] == 0 or tr_details["amount_out"] == 0:
|
|
logging.warning("Incomplete swap details found in logs. Getting details from transaction")
|
|
tr_details = await SAPI.get_transaction_details_info(tx_signature_str, logs)
|
|
# onlt needed if no details got
|
|
if before_source_balance > 0 and source_token_change > 0:
|
|
tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
|
|
#dirty fix for percentage > 100 (decimals 9 but expecting 6)
|
|
if tr_details["percentage_swapped"] > 100:
|
|
tr_details["percentage_swapped"] = tr_details["percentage_swapped"] / 1000
|
|
|
|
|
|
# update token info: ToDo: check, but already did
|
|
# all_token_addresses = list(set([tr_details["token_in"], tr_details["token_out"]]))
|
|
# await get_token_prices(all_token_addresses)
|
|
|
|
try:
|
|
token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
|
|
token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
|
|
|
|
tr_details["symbol_in"] = token_in.get('symbol')
|
|
tr_details["symbol_out"] = token_out.get('symbol')
|
|
tr_details['amount_in_USD'] = tr_details['amount_in'] * token_in.get('price', 0)
|
|
tr_details['amount_out_USD'] = tr_details['amount_out'] * token_out.get('price', 0)
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error fetching token prices: {e}")
|
|
|
|
message_text = (
|
|
f"<b>Swap detected: </b>\n"
|
|
f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " # ({tr_details['token_in']}) ({tr_details['token_out']})
|
|
f"{tr_details['symbol_out']} \n"
|
|
)
|
|
await telegram_utils.send_telegram_message(message_text)
|
|
await SAPI.follow_move(tr_details)
|
|
await SAPI.save_token_info()
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error aquiring log details and following: {e}")
|
|
await telegram_utils.send_telegram_message(f"Not followed! Error following move.")
|
|
|
|
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error processing log: {e}")
|
|
|
|
|
|
PROCESSING_LOG = False
|
|
return tr_details
|
|
|
|
|
|
|
|
# async def follow_move_legacy(move):
|
|
# global pk
|
|
# if pk is None:
|
|
# pk = await get_pk()
|
|
# your_balances = await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
|
# your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
|
|
# if your_balance_info is not None:
|
|
# # Use the balance
|
|
# print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
|
|
# else:
|
|
# print(f"No ballance found for {move['symbol_in']}. Skipping move.")
|
|
# await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
|
|
# return
|
|
|
|
# your_balance = your_balance_info['amount']
|
|
|
|
|
|
# token_info = SAPI.dex.TOKENS_INFO.get(move['token_in'])
|
|
# token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata(move['token_in'])
|
|
# token_name_out = SAPI.dex.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out'])
|
|
|
|
# if not your_balance:
|
|
# msg = f"<b>Move not followed:</b>\nNo balance found for token {move['symbol_in']}. Cannot follow move."
|
|
# logging.warning(msg)
|
|
# await telegram_utils.send_telegram_message(msg)
|
|
# return
|
|
|
|
# if FOLLOW_AMOUNT == 'percentage':
|
|
# # Calculate the amount to swap based on the same percentage as the followed move
|
|
# amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
|
|
# elif FOLLOW_AMOUNT == 'exact':
|
|
# amount_to_swap = move['amount_in']
|
|
# else:
|
|
# try:
|
|
# fixed_amount = float(FOLLOW_AMOUNT) # un USD
|
|
# fixed_amount_in_token = fixed_amount / move["token_in_price"]
|
|
# amount_to_swap = min(fixed_amount_in_token, your_balance)
|
|
# except ValueError:
|
|
# msg = f"<b>Move not followed:</b>\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
|
|
# logging.warning(msg)
|
|
# await telegram_utils.send_telegram_message(msg)
|
|
# return
|
|
|
|
# amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
|
|
|
|
# decimals = token_info.get('decimals')
|
|
# # Convert to lamports
|
|
# # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9
|
|
# amount = int(amount_to_swap * 10**decimals)
|
|
# amount = int(amount)
|
|
# logging.debug(f"Calculated amount in lamports: {amount}")
|
|
|
|
# if your_balance < amount_to_swap: # should not happen
|
|
# msg = (
|
|
# f"<b>Warning:</b>\n"
|
|
# f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway."
|
|
# )
|
|
# logging.warning(msg)
|
|
# await telegram_utils.send_telegram_message(msg)
|
|
# try:
|
|
|
|
# try:
|
|
# notification = (
|
|
# f"<b>Initiating move:</b>\n"
|
|
# f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
|
|
# + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
|
|
# )
|
|
# # logging.info(notification)
|
|
# # error_logger.info(notification)
|
|
# await telegram_utils.send_telegram_message(notification)
|
|
# except Exception as e:
|
|
# logging.error(f"Error sending notification: {e}")
|
|
|
|
# for retry in range(3):
|
|
# try:
|
|
# private_key = Keypair.from_bytes(base58.b58decode(pk))
|
|
# async_client = AsyncClient(SOLANA_WS_URL)
|
|
# jupiter = Jupiter(async_client, private_key)
|
|
# transaction_data = await jupiter.swap(
|
|
# input_mint=move['token_in'],
|
|
# output_mint=move['token_out'],
|
|
# amount=amount,
|
|
# slippage_bps=300, # Increased to 3%
|
|
# )
|
|
# logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
|
|
# error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}")
|
|
# raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
|
|
# message = raw_transaction.message
|
|
# signature = private_key.sign_message(message.to_bytes_versioned())
|
|
# signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
|
|
# opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
|
|
|
|
# # send the transaction
|
|
# result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
|
|
|
|
# transaction_id = json.loads(result.to_json())['result']
|
|
# print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}")
|
|
# # append to notification
|
|
# notification += f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
|
|
|
|
# await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
|
|
# tx_details = await SAPI.get_transaction_details_with_retry(transaction_id)
|
|
|
|
# if tx_details is not None:
|
|
# break
|
|
# else:
|
|
# logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...")
|
|
# await asyncio.sleep(3)
|
|
# except Exception as e:
|
|
# error_message = f"<b>Move Failed:</b>\n{str(e)}</b>\n{transaction_data}</b>\n{move}"
|
|
# logging.error(error_message)
|
|
# # log the errors to /logs/errors.log
|
|
# error_logger.error(error_message)
|
|
# error_logger.exception(e)
|
|
# await telegram_utils.send_telegram_message(error_message)
|
|
# amount = amount * 0.75
|
|
|
|
# await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
|
|
|
# try:
|
|
# if tx_details is None:
|
|
# logging.info(f"Failed to get transaction details for {transaction_id}")
|
|
# notification = (
|
|
# f"<b>Move Followed, failed to get transaction details.</b>\n"
|
|
# f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) "
|
|
# f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
|
|
# f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
|
|
# # log_successful_swap ()
|
|
# )
|
|
|
|
# else:
|
|
# notification = (
|
|
# f"<b>Move Followed:</b>\n"
|
|
# f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) "
|
|
# f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
|
|
# f"for {tx_details['amount_out']:.2f} {token_name_out}"
|
|
# # f"Amount In USD: {tr_details['amount_in_USD']}\n"
|
|
# f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
|
|
# )
|
|
# logging.info(notification)
|
|
# await telegram_utils.send_telegram_message(notification)
|
|
# except Exception as e:
|
|
# logging.error(f"Error sending notification: {e}")
|
|
|
|
# except Exception as e:
|
|
# error_message = f"<b>Swap Follow Error:</b>\n{str(e)}"
|
|
# logging.error(error_message)
|
|
# # log the errors to /logs/errors.log
|
|
# error_logger.error(error_message)
|
|
# error_logger.exception(e) \
|
|
# # if error_message contains 'Program log: Error: insufficient funds'
|
|
# if 'insufficient funds' in error_message:
|
|
# await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
|
|
# else:
|
|
# await telegram_utils.send_telegram_message(error_message)
|
|
|
|
|
|
# Helper functions
|
|
|
|
|
|
async def process_messages(websocket):
|
|
try:
|
|
while True:
|
|
response = await websocket.recv()
|
|
response_data = json.loads(response)
|
|
logger.debug(f"Received response: {response_data}")
|
|
|
|
if 'result' in response_data:
|
|
new_sub_id = response_data['result']
|
|
if int(new_sub_id) > 1:
|
|
subscription_id = new_sub_id
|
|
logger.info(f"Subscription successful. New id: {subscription_id}")
|
|
elif new_sub_id:
|
|
logger.info(f"Existing subscription confirmed: {subscription_id}")
|
|
else: return None
|
|
return subscription_id
|
|
elif 'params' in response_data:
|
|
log = response_data['params']['result']
|
|
logger.debug(f"Received transaction log: {log}")
|
|
asyncio.create_task(process_log(log))
|
|
else:
|
|
logger.warning(f"Unexpected response: {response_data}")
|
|
|
|
except websockets.exceptions.ConnectionClosedError as e:
|
|
logger.error(f"Connection closed unexpectedly: {e}")
|
|
# await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...")
|
|
pass
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to decode JSON: {e}")
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}")
|
|
|
|
|
|
pk = None
|
|
app = init_app()
|
|
# app = init_app(follow_move_legacy)
|
|
# Convert Flask app to ASGI
|
|
asgi_app = WsgiToAsgi(app)
|
|
|
|
async def restartable_task(task_func, *args, **kwargs):
|
|
while True:
|
|
try:
|
|
await task_func(*args, **kwargs)
|
|
except Exception as e:
|
|
logging.error(f"Error in task {task_func.__name__}: {e}")
|
|
await telegram_utils.send_telegram_message(f"Error in task {task_func.__name__}: {e}")
|
|
await asyncio.sleep(5) # Wait before retrying
|
|
|
|
async def main():
|
|
global bot, PROCESSING_LOG, pk
|
|
|
|
pk = await get_pk()
|
|
await telegram_utils.initialize()
|
|
await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
|
|
|
|
# Start the log processor
|
|
asyncio.create_task(restartable_task(watch_for_new_logs))
|
|
# process_transaction
|
|
|
|
if DO_WATCH_WALLET:
|
|
asyncio.create_task(restartable_task(SAPI.wallet_watch_loop))
|
|
await SAPI.wallet_watch_loop()
|
|
|
|
def run_asyncio_tasks():
|
|
asyncio.run(main())
|
|
|
|
if __name__ == '__main__':
|
|
import multiprocessing
|
|
|
|
# Start the asyncio tasks in a separate process
|
|
process = multiprocessing.Process(target=run_asyncio_tasks)
|
|
process.start()
|
|
|
|
# Run the ASGI server
|
|
uvicorn.run(
|
|
"app:asgi_app",
|
|
host="0.0.0.0",
|
|
port=3001,
|
|
log_level="info", # "debug"
|
|
reload=True
|
|
)
|
|
|
|
# Wait for the asyncio tasks to complete
|
|
process.join()
|