wip
This commit is contained in:
parent
800cbede4d
commit
885eb523f6
@ -14,6 +14,7 @@ DEVELOPER_CHAT_ID="777826553"
|
||||
TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0"
|
||||
|
||||
DISPLAY_CURRENCY=USD
|
||||
FOLLOW_AMOUNT=2
|
||||
|
||||
# Niki's to Sync: [PROD]
|
||||
FOLLOWED_WALLET="7keSmTZozjmuX66gd9GBSJYEHnMqsyutWpvuuKtXZKDH"
|
||||
|
@ -3,24 +3,21 @@ import uvicorn
|
||||
from asgiref.wsgi import WsgiToAsgi
|
||||
import websockets
|
||||
import json
|
||||
from flask import Flask, render_template, request, jsonify
|
||||
|
||||
import datetime
|
||||
import base64
|
||||
import os
|
||||
import base58
|
||||
from dotenv import load_dotenv, set_key
|
||||
import aiohttp
|
||||
import requests
|
||||
import re
|
||||
import random
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
from threading import Thread
|
||||
from solana.rpc.async_api import AsyncClient
|
||||
from solders.transaction import VersionedTransaction
|
||||
from solana.rpc.types import TxOpts
|
||||
from solana.rpc.commitment import Confirmed, Finalized, Processed
|
||||
from solders.keypair import Keypair
|
||||
from jupiter_python_sdk.jupiter import Jupiter
|
||||
from solana.rpc.commitment import Processed
|
||||
|
||||
from modules.webui import init_app
|
||||
from modules.storage import init_db, store_transaction
|
||||
@ -192,7 +189,7 @@ async def process_log(log_result):
|
||||
f"{tr_details['symbol_out']} \n"
|
||||
)
|
||||
await telegram_utils.send_telegram_message(message_text)
|
||||
await follow_move(tr_details)
|
||||
await SAPI.follow_move(tr_details)
|
||||
await SAPI.save_token_info()
|
||||
|
||||
except Exception as e:
|
||||
@ -210,10 +207,11 @@ async def process_log(log_result):
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
async def follow_move(move):
|
||||
your_balances = await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
async def follow_move_legacy(move):
|
||||
global pk
|
||||
if pk is None:
|
||||
pk = await get_pk()
|
||||
your_balances = await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
|
||||
if your_balance_info is not None:
|
||||
# Use the balance
|
||||
@ -243,8 +241,9 @@ async def follow_move(move):
|
||||
amount_to_swap = move['amount_in']
|
||||
else:
|
||||
try:
|
||||
fixed_amount = float(FOLLOW_AMOUNT)
|
||||
amount_to_swap = min(fixed_amount, your_balance)
|
||||
fixed_amount = float(FOLLOW_AMOUNT) # un USD
|
||||
fixed_amount_in_token = fixed_amount / move["token_in_price"]
|
||||
amount_to_swap = min(fixed_amount_in_token, your_balance)
|
||||
except ValueError:
|
||||
msg = f"<b>Move not followed:</b>\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
|
||||
logging.warning(msg)
|
||||
@ -272,11 +271,12 @@ async def follow_move(move):
|
||||
try:
|
||||
notification = (
|
||||
f"<b>Initiating move:</b>\n"
|
||||
f"Swapping {move['percentage_swapped']:.2f}% ({amount_to_swap:.2f}) {token_name_in} for {token_name_out}"
|
||||
f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
|
||||
+ (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
|
||||
)
|
||||
# logging.info(notification)
|
||||
# error_logger.info(notification)
|
||||
# await telegram_utils.send_telegram_message(notification)
|
||||
await telegram_utils.send_telegram_message(notification)
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending notification: {e}")
|
||||
|
||||
@ -324,7 +324,7 @@ async def follow_move(move):
|
||||
await telegram_utils.send_telegram_message(error_message)
|
||||
amount = amount * 0.75
|
||||
|
||||
await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
|
||||
try:
|
||||
if tx_details is None:
|
||||
@ -401,7 +401,7 @@ async def process_messages(websocket):
|
||||
|
||||
|
||||
pk = None
|
||||
app = init_app()
|
||||
app = init_app(follow_move_legacy)
|
||||
# Convert Flask app to ASGI
|
||||
asgi_app = WsgiToAsgi(app)
|
||||
|
||||
@ -427,7 +427,7 @@ if __name__ == '__main__':
|
||||
# Run the ASGI server
|
||||
uvicorn.run(
|
||||
"app:asgi_app",
|
||||
host="127.0.0.1",
|
||||
host="0.0.0.0",
|
||||
port=3001,
|
||||
log_level="debug",
|
||||
reload=True
|
||||
|
@ -68,4 +68,5 @@ def get_config():
|
||||
"SOLANA_HTTP_URL": SOLANA_HTTP_URL,
|
||||
"DISPLAY_CURRENCY": DISPLAY_CURRENCY,
|
||||
"BOT_NAME": BOT_NAME,
|
||||
"FOLLOW_AMOUNT": FOLLOW_AMOUNT,
|
||||
}
|
@ -1,10 +1,14 @@
|
||||
import struct
|
||||
import sys
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from solders import message
|
||||
# Get the directory where the current script is located
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
|
||||
from dexscreener import DexscreenerClient
|
||||
from solana.rpc.types import TokenAccountOpts, TxOpts
|
||||
@ -16,16 +20,18 @@ from solana.transaction import Transaction
|
||||
from spl.token.client import Token
|
||||
from base64 import b64decode
|
||||
import base58
|
||||
from solders.rpc.requests import GetTransaction
|
||||
from solders.signature import Signature
|
||||
from solders.pubkey import Pubkey
|
||||
from solders.keypair import Keypair
|
||||
from threading import Thread
|
||||
from solana.rpc.async_api import AsyncClient
|
||||
from solana.rpc.types import TxOpts
|
||||
from solana.rpc.commitment import Confirmed, Finalized, Processed
|
||||
|
||||
from solders.transaction import VersionedTransaction
|
||||
from solders.transaction import Transaction
|
||||
from solders.message import Message
|
||||
from solders.instruction import Instruction
|
||||
from solders.hash import Hash
|
||||
from solders.instruction import CompiledInstruction
|
||||
from solders.keypair import Keypair
|
||||
from solana.rpc.async_api import AsyncClient
|
||||
from solana.rpc.commitment import Processed
|
||||
from solana.rpc.types import TxOpts
|
||||
|
||||
from jupiter_python_sdk.jupiter import Jupiter
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
@ -37,6 +43,7 @@ from datetime import datetime
|
||||
from solana.rpc.types import TokenAccountOpts, TxOpts
|
||||
from typing import List, Dict, Any, Tuple
|
||||
import traceback
|
||||
import base64
|
||||
|
||||
# # # solders/solana libs (solana_client) # # #
|
||||
from spl.token._layouts import MINT_LAYOUT
|
||||
@ -53,9 +60,9 @@ logger = logging.getLogger(__name__)
|
||||
PING_INTERVAL = 30
|
||||
SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute
|
||||
|
||||
from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET)
|
||||
from config import (FOLLOW_AMOUNT, FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET, SOLANA_WS_URL)
|
||||
|
||||
from modules.utils import telegram_utils, async_safe_call
|
||||
from modules.utils import telegram_utils, async_safe_call, get_pk
|
||||
|
||||
# Use the production Solana RPC endpoint
|
||||
solana_client = AsyncClient(SOLANA_HTTP_URL)
|
||||
@ -182,7 +189,7 @@ class SolanaWS:
|
||||
await self.websocket.close()
|
||||
logger.info("WebSocket connection closed")
|
||||
|
||||
async def solana_jsonrpc(method, params=None, jsonParsed=True):
|
||||
async def solana_jsonrpc(self, method, params=None, jsonParsed=True):
|
||||
if not isinstance(params, list):
|
||||
params = [params] if params is not None else []
|
||||
|
||||
@ -211,6 +218,8 @@ class SolanaWS:
|
||||
return None
|
||||
|
||||
class SolanaAPI:
|
||||
|
||||
pk = None
|
||||
def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None):
|
||||
self.process_transaction = process_transaction_callback
|
||||
self.on_initial_subscription = on_initial_subscription_callback
|
||||
@ -267,10 +276,16 @@ class SolanaAPI:
|
||||
if solana_ws.websocket:
|
||||
await solana_ws.close()
|
||||
await async_safe_call(self.on_bot_message,"Reconnecting...")
|
||||
receive_task.cancel()
|
||||
process_task.cancel()
|
||||
if self.receive_task and not self.receive_task.cancelled():
|
||||
receive_task.cancel()
|
||||
if self.process_task and not self.process_task.cancelled():
|
||||
process_task.cancel()
|
||||
except Exception as e:
|
||||
logger.error(f"An error occurred while unsubscribing: {e}")
|
||||
finally:
|
||||
self.receive_task = None
|
||||
self.process_task = None
|
||||
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
|
||||
@ -307,11 +322,9 @@ class SolanaAPI:
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def get_token_metadata_symbol(mint_address):
|
||||
global TOKENS_INFO
|
||||
|
||||
if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]:
|
||||
return TOKENS_INFO[mint_address].get('symbol')
|
||||
async def get_token_metadata_symbol(self, mint_address):
|
||||
if mint_address in DEX.TOKENS_INFO and 'symbol' in DEX.TOKENS_INFO[mint_address]:
|
||||
return DEX.TOKENS_INFO[mint_address].get('symbol')
|
||||
|
||||
try:
|
||||
account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address)
|
||||
@ -320,32 +333,82 @@ class SolanaAPI:
|
||||
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
|
||||
account_data_info = account_data_data['parsed']['info']
|
||||
if 'decimals' in account_data_info:
|
||||
if mint_address in TOKENS_INFO:
|
||||
TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
|
||||
if mint_address in DEX.TOKENS_INFO:
|
||||
DEX.TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
|
||||
else:
|
||||
TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
|
||||
DEX.TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
|
||||
if 'tokenName' in account_data_info:
|
||||
if mint_address in TOKENS_INFO:
|
||||
TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
|
||||
if mint_address in DEX.TOKENS_INFO:
|
||||
DEX.TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
|
||||
else:
|
||||
TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
|
||||
DEX.TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
|
||||
|
||||
metadata = await get_token_metadata(mint_address)
|
||||
metadata = await self.get_token_metadata(mint_address)
|
||||
if metadata:
|
||||
if mint_address in TOKENS_INFO:
|
||||
TOKENS_INFO[mint_address].update(metadata)
|
||||
if mint_address in DEX.TOKENS_INFO:
|
||||
DEX.TOKENS_INFO[mint_address].update(metadata)
|
||||
else:
|
||||
TOKENS_INFO[mint_address] = metadata
|
||||
await save_token_info()
|
||||
# TOKENS_INFO[mint_address] = metadata
|
||||
DEX.TOKENS_INFO[mint_address] = metadata
|
||||
await DEX.save_token_info()
|
||||
# DEX.TOKENS_INFO[mint_address] = metadata
|
||||
# return metadata.get('symbol') or metadata.get('name')
|
||||
return TOKENS_INFO[mint_address].get('symbol')
|
||||
return DEX.TOKENS_INFO[mint_address].get('symbol')
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
|
||||
return None
|
||||
|
||||
async def get_transaction_details_rpc(tx_signature, readfromDump=False):
|
||||
global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO
|
||||
async def get_token_metadata(self, mint_address):
|
||||
try:
|
||||
# Convert mint_address to PublicKey if it's a string
|
||||
if isinstance(mint_address, str):
|
||||
mint_pubkey = Pubkey.from_string(mint_address)
|
||||
else:
|
||||
mint_pubkey = mint_address
|
||||
|
||||
# Derive metadata account address
|
||||
|
||||
metadata_program_id = Pubkey.from_string("metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s")
|
||||
metadata_account = Pubkey.find_program_address(
|
||||
[b"metadata", bytes(metadata_program_id), bytes(mint_pubkey)],
|
||||
metadata_program_id
|
||||
)[0]
|
||||
|
||||
# Fetch metadata account info
|
||||
metadata_account_info = await solana_client.get_account_info(metadata_account)
|
||||
|
||||
if metadata_account_info.value is not None:
|
||||
data = metadata_account_info.value.data
|
||||
# name = get_token_name_metadata(data).rstrip("\x00")
|
||||
# Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint)
|
||||
offset = 1 + 32 + 32
|
||||
# Read the name length (u32)
|
||||
name_length = struct.unpack("<I", data[offset:offset+4])[0]
|
||||
offset += 4
|
||||
# Read the name
|
||||
try:
|
||||
name = data[offset:offset+name_length].decode('utf-8').rstrip("\x00")
|
||||
except Exception as e: name = None
|
||||
offset += name_length
|
||||
# Read the symbol length (u32)
|
||||
symbol_length = struct.unpack("<I", data[offset:offset+4])[0]
|
||||
offset += 4
|
||||
# Read the symbol
|
||||
try:
|
||||
symbol = data[offset:offset+symbol_length].decode('utf-8').rstrip("\x00")
|
||||
except Exception as e: symbol = None
|
||||
|
||||
# metadata = METADATA_STRUCT.parse(data)
|
||||
|
||||
return {"name": name, "symbol": symbol, "address": mint_address}
|
||||
except Exception as e:
|
||||
logging.error(f"Error fetching token metadata for {mint_address}: {str(e)}")
|
||||
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
async def get_transaction_details_rpc(self, tx_signature, readfromDump=False):
|
||||
try:
|
||||
if readfromDump and os.path.exists('./logs/transation_details.json'):
|
||||
with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details
|
||||
@ -437,21 +500,21 @@ class SolanaAPI:
|
||||
account_data_info = account_data_data['parsed']['info']
|
||||
if 'mint' in account_data_info:
|
||||
transfer['mint'] = account_data_info['mint']
|
||||
if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]:
|
||||
await get_token_metadata_symbol(transfer['mint'])
|
||||
if transfer['mint'] in DEX.TOKENS_INFO or 'decimals' not in DEX.TOKENS_INFO[transfer['mint']]:
|
||||
await self.get_token_metadata_symbol(transfer['mint'])
|
||||
# get actual prices
|
||||
current_price = await get_token_prices([transfer['mint']])
|
||||
current_price = await self.get_token_prices([transfer['mint']])
|
||||
|
||||
if parsed_result["token_in"] is None:
|
||||
parsed_result["token_in"] = transfer['mint']
|
||||
parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol']
|
||||
parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
|
||||
parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
|
||||
parsed_result["symbol_in"] = DEX.TOKENS_INFO[transfer['mint']]['symbol']
|
||||
parsed_result["amount_in"] = transfer['amount']/10**DEX.TOKENS_INFO[transfer['mint']]['decimals']
|
||||
parsed_result["amount_in_USD"] = parsed_result["amount_in"] * DEX.TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']])
|
||||
elif parsed_result["token_out"] is None:
|
||||
parsed_result["token_out"] = transfer['mint']
|
||||
parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol']
|
||||
parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals']
|
||||
parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price']
|
||||
parsed_result["symbol_out"] = DEX.TOKENS_INFO[transfer['mint']]['symbol']
|
||||
parsed_result["amount_out"] = transfer['amount']/10**DEX.TOKENS_INFO[transfer['mint']]['decimals']
|
||||
parsed_result["amount_out_USD"] = parsed_result["amount_out"] * DEX.TOKENS_INFO[transfer['mint']]['price']
|
||||
|
||||
pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', [])
|
||||
for balance in pre_balalnces:
|
||||
@ -466,7 +529,7 @@ class SolanaAPI:
|
||||
parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100
|
||||
else:
|
||||
# calculate based on total wallet value: FOLLOWED_WALLET_VALUE
|
||||
parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100
|
||||
parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / DEX.FOLLOWED_WALLET_VALUE) * 100
|
||||
except Exception as e:
|
||||
logging.error(f"Error calculating percentage swapped: {e}")
|
||||
|
||||
@ -483,12 +546,9 @@ class SolanaAPI:
|
||||
# "Program log: after_source_balance: 0, after_destination_balance: 472509072",
|
||||
# "Program log: source_token_change: 58730110139, destination_token_change: 270131294",
|
||||
|
||||
async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
|
||||
global TOKENS_INFO
|
||||
|
||||
async def get_transaction_details_info(self, tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
|
||||
tr_info = await self.get_transaction_details_with_retry(tx_signature_str)
|
||||
|
||||
|
||||
try:
|
||||
tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50
|
||||
except Exception as e:
|
||||
@ -503,7 +563,7 @@ class SolanaAPI:
|
||||
# return float(balance['uiTokenAmount']['amount'])
|
||||
# return 0.0
|
||||
|
||||
async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, max_retries = 16):
|
||||
async def get_transaction_details_with_retry(self, transaction_id, retry_delay = 5, max_retries = 16):
|
||||
# wait for the transaction to be confirmed
|
||||
# await async_client.wait_for_confirmation(Signature.from_string(transaction_id))
|
||||
# query every 5 seconds for the transaction details until not None or 30 seconds
|
||||
@ -521,7 +581,7 @@ class SolanaAPI:
|
||||
return tx_details
|
||||
|
||||
|
||||
async def get_swap_transaction_details(tx_signature_str):
|
||||
async def get_swap_transaction_details(self, tx_signature_str):
|
||||
t = await self.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0)
|
||||
try:
|
||||
parsed_result = {
|
||||
@ -564,7 +624,7 @@ class SolanaAPI:
|
||||
return None
|
||||
|
||||
|
||||
async def get_token_balance_rpc(wallet_address, token_address):
|
||||
async def get_token_balance_rpc(self, wallet_address, token_address):
|
||||
try:
|
||||
accounts = await self.solana_ws.solana_jsonrpc("getTokenAccountsByOwner", [
|
||||
wallet_address,
|
||||
@ -601,6 +661,162 @@ class SolanaAPI:
|
||||
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}")
|
||||
return 0
|
||||
|
||||
async def follow_move(self,move):
|
||||
your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
|
||||
if your_balance_info is not None:
|
||||
# Use the balance
|
||||
print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
|
||||
else:
|
||||
print(f"No ballance found for {move['symbol_in']}. Skipping move.")
|
||||
await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
|
||||
return
|
||||
|
||||
your_balance = your_balance_info['amount']
|
||||
|
||||
|
||||
token_info = DEX.TOKENS_INFO.get(move['token_in'])
|
||||
token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata_symbol(move['token_in'])
|
||||
token_name_out = DEX.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out'])
|
||||
|
||||
if not your_balance:
|
||||
msg = f"<b>Move not followed:</b>\nNo balance found for token {move['symbol_in']}. Cannot follow move."
|
||||
logging.warning(msg)
|
||||
await telegram_utils.send_telegram_message(msg)
|
||||
return
|
||||
|
||||
if FOLLOW_AMOUNT == 'percentage':
|
||||
# Calculate the amount to swap based on the same percentage as the followed move
|
||||
amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
|
||||
elif FOLLOW_AMOUNT == 'exact':
|
||||
amount_to_swap = move['amount_in']
|
||||
else:
|
||||
try:
|
||||
fixed_amount = float(FOLLOW_AMOUNT) # un USD
|
||||
fixed_amount_in_token = fixed_amount / move["token_in_price"]
|
||||
amount_to_swap = min(fixed_amount_in_token, your_balance)
|
||||
except ValueError:
|
||||
msg = f"<b>Move not followed:</b>\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
|
||||
logging.warning(msg)
|
||||
await telegram_utils.send_telegram_message(msg)
|
||||
return
|
||||
|
||||
amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
|
||||
|
||||
decimals = token_info.get('decimals')
|
||||
# Convert to lamports
|
||||
# if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9
|
||||
amount = int(amount_to_swap * 10**decimals)
|
||||
amount = int(amount)
|
||||
logging.debug(f"Calculated amount in lamports: {amount}")
|
||||
|
||||
if your_balance < amount_to_swap: # should not happen
|
||||
msg = (
|
||||
f"<b>Warning:</b>\n"
|
||||
f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway."
|
||||
)
|
||||
logging.warning(msg)
|
||||
await telegram_utils.send_telegram_message(msg)
|
||||
try:
|
||||
|
||||
try:
|
||||
notification = (
|
||||
f"<b>Initiating move:</b>\n"
|
||||
f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
|
||||
+ (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
|
||||
)
|
||||
# logging.info(notification)
|
||||
# error_logger.info(notification)
|
||||
# await telegram_utils.send_telegram_message(notification)
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending notification: {e}")
|
||||
|
||||
if self.pk is None:
|
||||
self.pk = await get_pk()
|
||||
for retry in range(3):
|
||||
try:
|
||||
private_key = Keypair.from_bytes(base58.b58decode(self.pk))
|
||||
async_client = AsyncClient(SOLANA_WS_URL)
|
||||
jupiter = Jupiter(async_client, private_key)
|
||||
transaction_data = await jupiter.swap(
|
||||
input_mint=move['token_in'],
|
||||
output_mint=move['token_out'],
|
||||
amount=int(amount),
|
||||
slippage_bps=300, # Increased to 3%
|
||||
)
|
||||
logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
|
||||
# error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}")
|
||||
raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
|
||||
message = raw_transaction.message
|
||||
|
||||
signature = private_key.sign_message( bytes(message) )
|
||||
# signature = private_key.sign_message(message.to_bytes_versioned())
|
||||
signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
|
||||
opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
|
||||
|
||||
# send the transaction
|
||||
result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
|
||||
|
||||
transaction_id = json.loads(result.to_json())['result']
|
||||
print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}")
|
||||
# append to notification
|
||||
notification += f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
|
||||
|
||||
await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
|
||||
tx_details = await SAPI.get_transaction_details_with_retry(transaction_id)
|
||||
|
||||
if tx_details is not None:
|
||||
break
|
||||
else:
|
||||
logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...")
|
||||
await asyncio.sleep(3)
|
||||
except Exception as e:
|
||||
error_message = f"<b>Move Failed:</b>\n{str(e)}</b>\n{transaction_data}</b>\n{move}"
|
||||
logging.error(error_message)
|
||||
# log the errors to /logs/errors.log
|
||||
# error_logger.error(error_message)
|
||||
# error_logger.exception(e)
|
||||
await telegram_utils.send_telegram_message(error_message)
|
||||
amount = int(amount * 0.75)
|
||||
|
||||
await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
|
||||
|
||||
try:
|
||||
if tx_details is None:
|
||||
logging.info(f"Failed to get transaction details for {transaction_id}")
|
||||
notification = (
|
||||
f"<b>Move Followed, failed to get transaction details.</b>\n"
|
||||
f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) "
|
||||
f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
|
||||
f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
|
||||
# log_successful_swap ()
|
||||
)
|
||||
|
||||
else:
|
||||
notification = (
|
||||
f"<b>Move Followed:</b>\n"
|
||||
f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) "
|
||||
f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
|
||||
f"for {tx_details['amount_out']:.2f} {token_name_out}"
|
||||
# f"Amount In USD: {tr_details['amount_in_USD']}\n"
|
||||
f"\n\n<b>Transaction:</b> <a href='https://solscan.io/tx/{transaction_id}'>{transaction_id}</a>"
|
||||
)
|
||||
logging.info(notification)
|
||||
await telegram_utils.send_telegram_message(notification)
|
||||
except Exception as e:
|
||||
logging.error(f"Error sending notification: {e}")
|
||||
|
||||
except Exception as e:
|
||||
error_message = f"<b>Swap Follow Error:</b>\n{str(e)}"
|
||||
logging.error(error_message)
|
||||
# log the errors to /logs/errors.log
|
||||
# error_logger.error(error_message)
|
||||
# error_logger.exception(e)
|
||||
# if error_message contains 'Program log: Error: insufficient funds'
|
||||
if 'insufficient funds' in error_message:
|
||||
await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
|
||||
else:
|
||||
await telegram_utils.send_telegram_message(error_message)
|
||||
|
||||
|
||||
|
||||
@ -614,9 +830,11 @@ class SolanaDEX:
|
||||
self.TOKEN_ADDRESSES = {}
|
||||
self.FOLLOWED_WALLET_VALUE = 0
|
||||
self.YOUR_WALLET_VALUE = 0
|
||||
self.token_info_path = os.path.normpath(os.path.join(script_dir, '..', 'cache', 'token_info.json'))
|
||||
try:
|
||||
with open('../logs/token_info.json', 'r') as f:
|
||||
with open(self.token_info_path, 'r') as f:
|
||||
self.TOKENS_INFO = json.load(f)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error loading token info: {str(e)}")
|
||||
|
||||
@ -654,7 +872,7 @@ class SolanaDEX:
|
||||
for token, price in prices.items():
|
||||
token_info = self.TOKENS_INFO.setdefault(token, {})
|
||||
if 'symbol' not in token_info:
|
||||
token_info['symbol'] = await self.get_token_metadata_symbol(token)
|
||||
token_info['symbol'] = await SAPI.get_token_metadata_symbol(token)
|
||||
token_info['price'] = price
|
||||
|
||||
return prices
|
||||
@ -914,21 +1132,16 @@ class SolanaDEX:
|
||||
# save token info to file
|
||||
await self.save_token_info()
|
||||
|
||||
async def save_token_info(self):
|
||||
with open(self.token_info_path, 'w') as f:
|
||||
json.dump(self.TOKENS_INFO, f, indent=2)
|
||||
|
||||
@staticmethod
|
||||
def safe_get_property(obj, prop):
|
||||
return obj.get(prop, 'N/A')
|
||||
|
||||
async def get_token_metadata_symbol(self, token_address):
|
||||
# Implement this method to fetch token metadata symbol
|
||||
pass
|
||||
|
||||
async def save_token_info(self):
|
||||
# Implement this method to save token info to a file
|
||||
pass
|
||||
|
||||
async def save_token_info():
|
||||
with open('./logs/token_info.json', 'w') as f:
|
||||
json.dump(TOKENS_INFO, f, indent=2)
|
||||
|
||||
DEX = SolanaDEX(DISPLAY_CURRENCY)
|
||||
SAPI = SolanaAPI( on_initial_subscription_callback=DEX.list_initial_wallet_states(FOLLOWED_WALLET,YOUR_WALLET))
|
@ -11,7 +11,7 @@ from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME
|
||||
|
||||
|
||||
import asyncio
|
||||
from typing import Callable, Any
|
||||
from typing import Callable, Any, Union, Coroutine
|
||||
import time
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
@ -111,7 +111,11 @@ def safe_get_property(info, property_name, default='Unknown'):
|
||||
value = info.get(property_name, default)
|
||||
return str(value) if value is not None else str(default)
|
||||
|
||||
async def async_safe_call(func: Callable, *args: Any, **kwargs: Any) -> Any:
|
||||
async def async_safe_call(
|
||||
func: Union[Callable, Coroutine, None],
|
||||
*args: Any,
|
||||
**kwargs: Any
|
||||
) -> Any:
|
||||
"""
|
||||
Safely call a function that might be synchronous, asynchronous, or a coroutine object.
|
||||
|
||||
@ -123,18 +127,30 @@ async def async_safe_call(func: Callable, *args: Any, **kwargs: Any) -> Any:
|
||||
if func is None:
|
||||
return None
|
||||
|
||||
if callable(func):
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return await func(*args, **kwargs)
|
||||
try:
|
||||
if asyncio.iscoroutine(func):
|
||||
# If func is already a coroutine object, just await it
|
||||
return await func
|
||||
elif callable(func):
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
# If func is an async function, call it with args and await
|
||||
return await func(*args, **kwargs)
|
||||
else:
|
||||
# If func is a regular function, just call it
|
||||
return func(*args, **kwargs)
|
||||
else:
|
||||
return func(*args, **kwargs)
|
||||
elif asyncio.iscoroutine(func):
|
||||
# If func is already a coroutine object, just await it
|
||||
return await func
|
||||
else:
|
||||
logging.warning(f"Expected a callable or coroutine, but got {type(func)}: {func}")
|
||||
return None
|
||||
logging.warning(f"Expected a callable or coroutine, but got {type(func)}: {func}")
|
||||
return None
|
||||
|
||||
except RuntimeError as e:
|
||||
if "cannot reuse already awaited coroutine" in str(e):
|
||||
logging.error(f"Attempted to reuse an already awaited coroutine: {func}")
|
||||
else:
|
||||
logging.error(f"Runtime error in async_safe_call: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logging.error(f"Error in async_safe_call: {type(e).__name__}: {e}")
|
||||
return None
|
||||
|
||||
# Create a global instance of TelegramUtils
|
||||
telegram_utils = TelegramUtils()
|
||||
|
@ -2,13 +2,19 @@ from flask import Flask, jsonify, request, render_template, redirect, url_for
|
||||
# from flask_oauthlib.client import OAuth
|
||||
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
||||
import secrets
|
||||
import json
|
||||
from modules import storage, utils, SolanaAPI
|
||||
from modules.utils import async_safe_call
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
def init_app():
|
||||
on_transaction = None
|
||||
|
||||
def init_app(tr_handler=None):
|
||||
global on_transaction
|
||||
on_transaction = tr_handler
|
||||
app = Flask(__name__, template_folder='../templates', static_folder='../static')
|
||||
app.config['SECRET_KEY'] = 'your-secret-key'
|
||||
login_manager = LoginManager(app)
|
||||
login_manager.login_view = 'login'
|
||||
|
||||
@ -38,7 +44,7 @@ def init_app():
|
||||
async def transaction_notified(wallet, tx_signature):
|
||||
try:
|
||||
logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}")
|
||||
request_data = request.get_json()
|
||||
request_data = request.get_json() if request.is_json else None
|
||||
if not request_data:
|
||||
# Process the transaction
|
||||
# tr = await get_swap_transaction_details(tx_signature)
|
||||
@ -50,6 +56,9 @@ def init_app():
|
||||
# ToDo - probably optimize
|
||||
tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in'])
|
||||
tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out'])
|
||||
prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']])
|
||||
tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in']
|
||||
tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out']
|
||||
|
||||
notification = (
|
||||
f"<b>Got TXN notification:</b>: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n"
|
||||
@ -58,16 +67,15 @@ def init_app():
|
||||
await utils.telegram_utils.send_telegram_message(notification)
|
||||
|
||||
# Store the notified transaction in the database
|
||||
storage.store_transaction(tr)
|
||||
|
||||
await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
|
||||
# Attempt to execute the copytrade transaction
|
||||
try:
|
||||
await SolanaAPI.SAPI.follow_move(tr)
|
||||
# Store the successful copytrade transaction
|
||||
storage.store_copytrade_transaction(tr, success=True)
|
||||
await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
|
||||
except Exception as e:
|
||||
# Store the failed copytrade transaction
|
||||
storage.store_copytrade_transaction(tr, success=False, error=str(e))
|
||||
await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
|
||||
logging.error(f"Copytrade transaction failed: {e}")
|
||||
# ToDo - probably optimize
|
||||
await SolanaAPI.SAPI.save_token_info()
|
||||
@ -76,9 +84,137 @@ def init_app():
|
||||
logging.error(f"Error processing transaction: {e}")
|
||||
return jsonify({"error": "Failed to process transaction"}), 500
|
||||
|
||||
@app.route('/wh', methods=['POST'])
|
||||
async def webhook():
|
||||
try:
|
||||
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
logger.info("Processing webhook")
|
||||
request_data = request.get_json() if request.is_json else None
|
||||
if not request_data:
|
||||
return jsonify({"error": "No data in request"}), 400
|
||||
logger.info(f"Webhook data: {request_data}")
|
||||
# save dump to /cache/last-webhook-{datetime}.json
|
||||
|
||||
with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f:
|
||||
json.dump(request_data, f)
|
||||
|
||||
process_wh(request_data)
|
||||
return jsonify({"status": "Webhook processed"}), 200
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing webhook: {e}")
|
||||
return jsonify({"error": "Failed to process webhook"}), 500
|
||||
|
||||
# Flask route to retry processing the last log
|
||||
|
||||
async def process_wh( data):
|
||||
global on_transaction
|
||||
|
||||
try:
|
||||
if data[0].get('type') == "SWAP":
|
||||
swap_event = data[0]['events'].get('swap')
|
||||
if not swap_event:
|
||||
logging.warning("No swap event found in data")
|
||||
return
|
||||
|
||||
# Extract token input details from the first token input
|
||||
token_inputs = swap_event.get('tokenInputs', [])
|
||||
token_outputs = swap_event.get('tokenOutputs', [])
|
||||
|
||||
if not token_inputs or not token_outputs:
|
||||
logging.warning("Missing token inputs or outputs")
|
||||
return
|
||||
|
||||
tr = {
|
||||
'token_in': token_inputs[0]['mint'],
|
||||
'token_out': token_outputs[0]['mint'],
|
||||
'amount_in': float(token_inputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_inputs[0]['rawTokenAmount']['decimals'],
|
||||
'amount_out': float(token_outputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_outputs[0]['rawTokenAmount']['decimals'],
|
||||
}
|
||||
|
||||
if not tr["token_in"] or not tr["token_out"] or tr["amount_in"] == 0 or tr["amount_out"] == 0:
|
||||
logging.warning("Incomplete swap details found in logs. Getting details from transaction")
|
||||
tx_signature = data[0].get('signature')
|
||||
logs = data[0].get('logs', [])
|
||||
tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, logs)
|
||||
|
||||
wallet = data[0]['feePayer'] # Using feePayer as the wallet address
|
||||
tx_signature = data[0]['signature']
|
||||
|
||||
# ToDo - probably optimize
|
||||
tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in'])
|
||||
tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out'])
|
||||
prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']])
|
||||
tr["token_in_price"] = prices.get(tr['token_in'], 0)
|
||||
tr["token_out_price"] = prices.get(tr['token_out'], 0)
|
||||
tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in']
|
||||
tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out']
|
||||
|
||||
notification = (
|
||||
f"<b>Got WH notification:</b>: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n"
|
||||
)
|
||||
logging.info(notification)
|
||||
await utils.telegram_utils.send_telegram_message(notification)
|
||||
|
||||
# Store the notified transaction in the database
|
||||
storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
|
||||
# Attempt to execute the copytrade transaction
|
||||
try:
|
||||
# await SolanaAPI.SAPI.follow_move(tr)
|
||||
if on_transaction:
|
||||
await async_safe_call( on_transaction, tr)
|
||||
else:
|
||||
await SolanaAPI.SAPI.follow_move(tr)
|
||||
# Store the successful copytrade transaction
|
||||
storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
|
||||
except Exception as e:
|
||||
# Store the failed copytrade transaction
|
||||
storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
|
||||
logging.error(f"Copytrade transaction failed: {e}")
|
||||
# ToDo - probably optimize
|
||||
await SolanaAPI.DEX.save_token_info()
|
||||
|
||||
except Exception as e:
|
||||
logging.error(f"Error processing transaction notification: {str(e)}")
|
||||
# Log the full traceback for debugging
|
||||
import traceback
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
@app.route('/retry', methods=['GET'])
|
||||
@app.route('/retry-last-log', methods=['GET'])
|
||||
async def retry_last_log():
|
||||
wh = request.args.get('wh', 'false').lower() == 'true'
|
||||
|
||||
latest_log_file = get_latest_log_file(wh)
|
||||
if not latest_log_file:
|
||||
return jsonify({"error": "No log files found"}), 404
|
||||
|
||||
try:
|
||||
utils.log.info(f"Processing latest log file: {latest_log_file}")
|
||||
with open(latest_log_file, 'r') as f:
|
||||
log = json.load(f)
|
||||
|
||||
if wh:
|
||||
result = await process_wh(log)
|
||||
else:
|
||||
result = await SolanaAPI.process_log(log)
|
||||
|
||||
return jsonify({
|
||||
"file": latest_log_file,
|
||||
"status": "Log dump processed successfully",
|
||||
"result": result
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
utils.log.error(f"Error processing log dump: {e}")
|
||||
return jsonify({"error": "Failed to process log"}), 500
|
||||
|
||||
|
||||
|
||||
|
||||
# # # #
|
||||
# AUTHENTICATION
|
||||
# # # #
|
||||
|
||||
@app.route('/login/google/authorized')
|
||||
def authorized():
|
||||
# resp = google.authorized_response()
|
||||
@ -123,7 +259,9 @@ def init_app():
|
||||
else:
|
||||
return render_template('login.html', error='Invalid credentials')
|
||||
elif request.args.get('google'):
|
||||
return google.authorize(callback=url_for('authorized', _external=True))
|
||||
# Uncomment the following line if Google OAuth is set up
|
||||
# return google.authorize(callback=url_for('authorized', _external=True))
|
||||
return render_template('login.html', error='Google OAuth not configured')
|
||||
return render_template('login.html')
|
||||
|
||||
@app.route('/logout')
|
||||
@ -156,42 +294,19 @@ def init_app():
|
||||
holdings = storage.get_holdings(wallet_id)
|
||||
return jsonify(holdings)
|
||||
|
||||
|
||||
# Flask route to retry processing the last log
|
||||
@app.route('/retry', methods=['GET'])
|
||||
@app.route('/retry-last-log', methods=['GET'])
|
||||
async def retry_last_log():
|
||||
latest_log_file = get_latest_log_file()
|
||||
if not latest_log_file:
|
||||
return jsonify({"error": "No log files found"}), 404
|
||||
|
||||
try:
|
||||
utils.log.info(f"Processing latest log file: {latest_log_file}")
|
||||
with open(latest_log_file, 'r') as f:
|
||||
log = json.load(f)
|
||||
|
||||
result = await SolanaAPI.process_log(log)
|
||||
|
||||
return jsonify({
|
||||
"file": latest_log_file,
|
||||
"status": "Log dump processed successfully",
|
||||
"result": result
|
||||
}), 200
|
||||
|
||||
except Exception as e:
|
||||
utils.log.error(f"Error processing log dump: {e}")
|
||||
return jsonify({"error": "Failed to process log"}), 500
|
||||
|
||||
|
||||
return app
|
||||
|
||||
|
||||
# Function to find the latest log file
|
||||
def get_latest_log_file():
|
||||
log_dir = './logs'
|
||||
def get_latest_log_file(wh:bool):
|
||||
log_dir = os.path.join(SolanaAPI.root_path, 'logs')
|
||||
try:
|
||||
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
|
||||
# filter files mask log_20241005_004103_143116.json
|
||||
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
|
||||
if wh:
|
||||
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('wh-')]
|
||||
else:
|
||||
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
|
||||
|
||||
latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x)))
|
||||
return os.path.join(log_dir, latest_file)
|
||||
|
Loading…
x
Reference in New Issue
Block a user