From 7a1ef2fb7a164f735eb27f5f1d0000adf133e81a Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Tue, 22 Oct 2024 01:27:46 +0300 Subject: [PATCH] refactor wip --- crypto/sol/app.py | 604 ++------------------------------ crypto/sol/config.py | 7 + crypto/sol/modules/SolanaAPI.py | 160 ++++++++- crypto/sol/modules/utils.py | 45 ++- crypto/sol/modules/webui.py | 15 +- 5 files changed, 234 insertions(+), 597 deletions(-) diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 636d574..867e270 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -4,85 +4,39 @@ from asgiref.wsgi import WsgiToAsgi import websockets import json from flask import Flask, render_template, request, jsonify -from solana.rpc.async_api import AsyncClient -from solana.transaction import Signature -from solana.rpc.websocket_api import connect -from solana.rpc.commitment import Confirmed, Processed -from solana.transaction import Transaction -from spl.token.client import Token -from base64 import b64decode -import base58 -from solders.rpc.requests import GetTransaction -from solders.signature import Signature -from solders.pubkey import Pubkey -from solders.keypair import Keypair -from solders.transaction import VersionedTransaction -from solders.transaction import Transaction -from solders.message import Message -from solders.instruction import Instruction -from solders.hash import Hash -from solders.instruction import CompiledInstruction -from solders import message -from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA -from dexscreener import DexscreenerClient -from solana.rpc.types import TokenAccountOpts, TxOpts import datetime import base64 import os -from dotenv import load_dotenv,set_key +import base58 +from dotenv import load_dotenv, set_key import aiohttp -from typing import List, Dict +from typing import List, Dict, Any, Tuple import requests import re -from typing import List, Dict, Any, Tuple import random from threading import Thread +from solana.keypair import Keypair +from solana.rpc.async_api import AsyncClient +from solana.transaction import VersionedTransaction, TxOpts +from solana.rpc.types import Processed +from jupiter import Jupiter app = Flask(__name__) from modules.webui import init_app from modules.storage import init_db, store_transaction -from modules.utils import telegram_utils, logging - +from modules.utils import telegram_utils, logging, get_pk, send_telegram_message +from modules.SolanaAPI import SAPI, SolanaAPI, get_wallet_balances, get_transaction_details_with_retry, save_token_info # config = load_config() load_dotenv() load_dotenv('.env.secret') # Configuration -DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") -FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") -YOUR_WALLET = os.getenv("YOUR_WALLET") -TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") -SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") -SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") -DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') -FOLLOW_AMOUNT = os.getenv('FOLLOW_AMOUNT', 'percentage') +from config import (FOLLOWED_WALLET, YOUR_WALLET, SOLANA_WS_URL, SOLANA_HTTP_URL, FOLLOW_AMOUNT, SOLANA_ENDPOINTS, logging, error_logger, logger) -# Configuration -DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") -FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") -YOUR_WALLET = os.getenv("YOUR_WALLET") -TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") -SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") -SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") -DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') - - -# Use the production Solana RPC endpoint -solana_client = AsyncClient(SOLANA_HTTP_URL) -dexscreener_client = DexscreenerClient() - - - -# Token addresses (initialize with some known tokens) -TOKEN_ADDRESSES = { - "SOL": "So11111111111111111111111111111111111111112", - "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", - "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", -} TOKENS_INFO = {} try: @@ -92,11 +46,11 @@ except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # -if not telegram_utils.bot: - try: - asyncio.run(telegram_utils.initialize()) - except Exception as e: - logging.error(f"Error initializing Telegram bot: {str(e)}") +# if not telegram_utils.bot: +# try: +# asyncio.run(telegram_utils.initialize()) +# except Exception as e: +# logging.error(f"Error initializing Telegram bot: {str(e)}") # async def telegram_utils.send_telegram_message(message): # try: # await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) @@ -116,321 +70,14 @@ if not telegram_utils.bot: # # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # -solana_client = AsyncClient(SOLANA_HTTP_URL) - -async def get_token_balance_rpc(wallet_address, token_address): - url = SOLANA_HTTP_URL - headers = {"Content-Type": "application/json"} - data = { - "jsonrpc": "2.0", - "id": 1, - "method": "getTokenAccountsByOwner", - "params": [ - wallet_address, - { - "mint": token_address - }, - { - "encoding": "jsonParsed" - } - ] - } - try: - response = requests.post(url, headers=headers, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - accounts = response.json() - - if 'result' in accounts and accounts['result']['value']: - first_account = accounts['result']['value'][0]['pubkey'] - balance_data = { - "jsonrpc": "2.0", - "id": 1, - "method": "getTokenAccountBalance", - "params": [ - first_account - ] - } - balance_response = requests.post(url, headers=headers, data=json.dumps(balance_data)) - balance_response.raise_for_status() - balance = balance_response.json() - - if 'result' in balance and 'value' in balance['result']: - amount = float(balance['result']['value']['uiAmount']) - logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") - return amount - else: - logging.debug(f"No balance found for {token_address} in {wallet_address}") - return 0 - else: - logging.debug(f"No account found for {token_address} in {wallet_address}") - return 0 - except requests.exceptions.RequestException as e: - logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") - return 0 - - -# # # solders/solana libs (solana_client) # # # -from spl.token._layouts import MINT_LAYOUT -from solana.rpc.api import Client, Pubkey -from spl.token.async_client import AsyncToken - -from spl.token.constants import TOKEN_PROGRAM_ID -from borsh_construct import String, CStruct - - -METADATA_STRUCT = CStruct( - "update_authority" / String, - "mint" / String, - "name" / String, - "symbol" / String, - "uri" / String, - # ... other fields ... -) -import struct -def get_token_name_metadata(metadata_account_data): - try: - # Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint) - offset = 1 + 32 + 32 - - # Read the name length (u32) - name_length = struct.unpack(" update_authority -# stream read less than specified amount, expected 2189641476, found 675 - - # # Parse metadata - # key = data[:4] - # update_authority = Pubkey(data[4:36]) - # mint = Pubkey(data[36:68]) - # name_length = struct.unpack(' 0 and data[0] == 4: - # name_len = int.from_bytes(data[1:5], byteorder="little") - # name = data[5:5+name_len].decode("utf-8").strip("\x00") - - # symbol_len = int.from_bytes(data[5+name_len:9+name_len], byteorder="little") - # symbol = data[9+name_len:9+name_len+symbol_len].decode("utf-8").strip("\x00") - - # return {"name": name, "symbol": symbol} - except Exception as e: - logging.error(f"Error fetching token metadata for {mint_address}: {str(e)}") - - return None - - -async def get_wallet_balances(wallet_address, doGetTokenName=True): - balances = {} - logging.info(f"Getting balances for wallet: {wallet_address}") - global TOKENS_INFO - try: - response = await solana_client.get_token_accounts_by_owner_json_parsed( - Pubkey.from_string(wallet_address), - opts=TokenAccountOpts( - program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") - ), - commitment=Confirmed - ) - - if response.value: - for account in response.value: - try: - parsed_data = account.account.data.parsed - if isinstance(parsed_data, dict) and 'info' in parsed_data: - info = parsed_data['info'] - if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: - mint = info['mint'] - decimals = info['tokenAmount']['decimals'] - amount = float(info['tokenAmount']['amount'])/10**decimals - if amount > 0: - if mint in TOKENS_INFO: - token_name = TOKENS_INFO[mint].get('symbol') - elif doGetTokenName: - token_name = await get_token_metadata_symbol(mint) or 'N/A' - # sleep for 1 second to avoid rate limiting - await asyncio.sleep(2) - - TOKENS_INFO[mint]['holdedAmount'] = round(amount, decimals) - TOKENS_INFO[mint]['decimals'] = decimals - balances[mint] = { - 'name': token_name or 'N/A', - 'address': mint, - 'amount': amount, - 'decimals': decimals - } - # sleep for 1 second to avoid rate limiting - logging.debug(f"Account balance for {token_name} ({mint}): {amount}") - else: - logging.warning(f"Unexpected data format for account: {account}") - except Exception as e: - logging.error(f"Error parsing account data: {str(e)}") - - sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) - if sol_balance.value is not None: - balances['SOL'] = { - 'name': 'SOL', - 'address': 'SOL', - 'amount': sol_balance.value / 1e9 - } - else: - logging.warning(f"SOL balance response missing for wallet: {wallet_address}") - - except Exception as e: - logging.error(f"Error getting wallet balances: {str(e)}") - logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") - return balances - -async def convert_balances_to_currency(balances , sol_price): - converted_balances = {} - for address, info in balances.items(): - converted_balance = info.copy() # Create a copy of the original info - if info['name'] == 'SOL': - converted_balance['value'] = info['amount'] * sol_price - elif address in TOKEN_PRICES: - converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] - else: - converted_balance['value'] = None # Price not available - logging.warning(f"Price not available for token {info['name']} ({address})") - converted_balances[address] = converted_balance - return converted_balances - - -async def get_swap_transaction_details(tx_signature_str): - t = await solana_client.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) - try: - parsed_result = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } - - instructions = t.value.transaction.transaction.message.instructions - # Parse the swap instruction to extract token addresses, amounts, and types - for instruction in instructions: - if isinstance(instruction, CompiledInstruction): - if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"): - parsed_info = instruction.parsed.info - mint = parsed_info["mint"] - amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"]) - - # Determine token in and token out based on balances - if parsed_result["token_in"] is None and amount > 0: - parsed_result["token_in"] = mint - parsed_result["amount_in"] = amount - elif parsed_result["token_out"] is None: - parsed_result["token_out"] = mint - parsed_result["amount_out"] = amount - - # Calculate percentage swapped - if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: - parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 - - return parsed_result - - except Exception as e: - logging.error(f"Error fetching transaction details: {e}") - - return None # # # # # # # # # # Functionality # # # # # # # # # # -def safe_get_property(info, property_name, default='Unknown'): - if not isinstance(info, dict): - return str(default) - value = info.get(property_name, default) - return str(value) if value is not None else str(default) - -async def save_token_info(): - with open('./logs/token_info.json', 'w') as f: - json.dump(TOKENS_INFO, f, indent=2) - -async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, max_retries = 16): - # wait for the transaction to be confirmed - # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) - # query every 5 seconds for the transaction details until not None or 30 seconds - for _ in range(max_retries): - try: - tx_details = await get_transaction_details_rpc(transaction_id) - if tx_details is not None: - break - except Exception as e: - logging.error(f"Error fetching transaction details: {e}") - retry_delay = retry_delay * 1.2 - logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}. retry in {retry_delay} s.") - await asyncio.sleep(retry_delay) - retry_delay *= 1.2 - return tx_details async def save_log(log): @@ -578,27 +225,16 @@ async def process_log(log_result): async def get_transaction_details_info(tx_signature_str: str, logs: List[str]) -> Dict[str, Any]: global TOKENS_INFO - tr_info = await get_transaction_details_with_retry(tx_signature_str) + tr_info = await SAPI.get_transaction_details_with_retry(tx_signature_str) - # Fetch token prices: ToDo: check, but already did - # token_prices = await get_token_prices([tr_info['token_in'], tr_info['token_out']]) - # # for token, price in token_prices.items(): - # # if not token in TOKENS_INFO or not TOKENS_INFO[token].get('symbol'): - # # token_name = await get_token_metadata_symbol(token) - # # TOKENS_INFO[token] = {'symbol': token_name} - # # TOKENS_INFO[token] = {'price': price} - # # Calculate USD values - # tr_info['amount_in_USD'] = tr_info['amount_in'] * token_prices.get(tr_info['token_in'], 0) - # tr_info['amount_out_USD'] = tr_info['amount_out'] * token_prices.get(tr_info['token_out'], 0) - - # Calculate the percentage of the source balance that was swapped; ToDo: fix decimals for percentage try: tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50 except Exception as e: logging.error(f"Error calculating percentage swapped: {e}") return tr_info + def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: pre_balances = transaction_details.get('meta', {}).get('preTokenBalances', []) for balance in pre_balances: @@ -759,129 +395,8 @@ async def follow_move(move): # Helper functions -SOLANA_ENDPOINTS = [ - "wss://api.mainnet-beta.solana.com", - # "wss://solana-api.projectserum.com", - # "wss://rpc.ankr.com/solana", - # "wss://mainnet.rpcpool.com", -] -PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 10 minutes -# async def heartbeat(websocket): -# while True: -# try: -# await websocket.ping() -# await asyncio.sleep(PING_INTERVAL) -# except websockets.exceptions.ConnectionClosed: -# break - -_first_subscription = True -_process_task = None -async def wallet_watch_loop(): - global _first_subscription, _process_task - reconnect_delay = 5 - max_reconnect_delay = 60 - - while True: - try: - try: - subscription_id = None - current_url = random.choice(SOLANA_ENDPOINTS) - async with websockets.connect(current_url, ping_interval=30, ping_timeout=20) as websocket: - logger.info(f"Connected to Solana websocket: {current_url}") - # heartbeat_task = asyncio.create_task(heartbeat(websocket)) - - while True: - if websocket.closed: - break - - subscription_id = await subscribe(websocket) - if subscription_id is not None: - # await send_telegram_message(f"Solana mainnet connected ({subscription_id})...") - if _first_subscription: - asyncio.create_task( list_initial_wallet_states()) - _first_subscription = False - _process_task = asyncio.create_task(process_messages(websocket)) - while True: - try:# drop subscription now - await process_messages(websocket, subscription_id) - # await asyncio.run(_process_task) - # await asyncio.wait_for(_process_task, timeout=SUBSCRIBE_INTERVAL) - except asyncio.TimeoutError: - # Timeout occurred, time to resubscribe - if not PROCESSING_LOG: - _process_task.cancel() - try: - await _process_task - except asyncio.CancelledError: - pass - await unsubscribe(websocket, subscription_id) - new_sub_id = await subscribe(websocket) - if new_sub_id is None: break - if new_sub_id > 1: # we sometimes get True instead of integer, so we cje - subscription_id = new_sub_id - logger.info(f"New subscription created with ID: {subscription_id}") - elif new_sub_id is True: - # Already subscribed - logger.info("Already subscribed, continuing with existing subscription") - if subscription_id: - process_task = asyncio.create_task(process_messages(websocket)) - - else: - # process_messages completed (shouldn't happen unless there's an error) - break - else: - send_telegram_message("Failed to connect. Retrying...") - - # heartbeat_task.cancel() - - except websockets.exceptions.WebSocketException as e: - logger.error(f"WebSocket error: {e}") - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - - await unsubscribe(websocket, subscription_id) - await send_telegram_message("reconnecting...") - logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") - await websocket.close() - except Exception as e: - logger.error(f"An unexpected error occurred - breaking watch loop: {e}") - - await asyncio.sleep(reconnect_delay) - reconnect_delay = min(reconnect_delay * 1.2, max_reconnect_delay) - -async def subscribe(websocket): - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [FOLLOWED_WALLET]}, - {"commitment": "confirmed"} - ] - } - try: - await websocket.send(json.dumps(request)) - logger.info("Subscription request sent") - return await process_messages(websocket) - except Exception as e: - logger.error(f"An unexpected error occurred: {e}") - return None - -async def unsubscribe(websocket, subscription_id): - if subscription_id: - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsUnsubscribe", - "params": [subscription_id] - } - await websocket.send(json.dumps(request)) - logger.info(f"Unsubscribed from subscription id: {subscription_id}") - subscription_id = None - async def process_messages(websocket): try: while True: @@ -915,27 +430,7 @@ async def process_messages(websocket): logger.error(f"An unexpected error occurred: {e}") -pk = os.getenv("PK") -async def check_PK(): - global pk - if not pk: - try: - script_dir = os.path.dirname(os.path.abspath(__file__)) - with open(os.path.join(script_dir, 'secret.pk'), 'r') as f: - pk = f.read().strip() - if pk: - logging.info("Private key loaded successfully from file.") - else: - logging.warning("Private key file is empty.") - except FileNotFoundError: - logging.warning("Private key file not found.") - except Exception as e: - logging.error(f"Error reading private key file: {str(e)}") - if not pk: - logging.error("Private key not found in environment variables. Will not be able to sign transactions.") - # send TG warning message - await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") - +pk = get_pk() # Convert Flask app to ASGI asgi_app = WsgiToAsgi(app) @@ -943,67 +438,26 @@ asgi_app = WsgiToAsgi(app) async def main(): global solanaAPI, bot, PROCESSING_LOG - telegram_utils.initialize() + await telegram_utils.initialize() await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") - await check_PK() - # new: restart wallet_watch_loop every hour - await wallet_watch_loop() + # process_transaction + await SAPI.wallet_watch_loop() - # while True: - # wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop()) - - # try: - # # Wait for an hour or until the task completes, whichever comes first - # await asyncio.wait_for(wallet_watch_task, timeout=3600) - # except asyncio.TimeoutError: - # # If an hour has passed, cancel the task if not PROCESSING - # if PROCESSING_LOG: - # logging.info("wallet_watch_loop is processing logs. Will not restart.") - # await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") - # else: - # wallet_watch_task.cancel() - # try: - # await wallet_watch_task - # except asyncio.CancelledError: - # logging.info("wallet_watch_loop was cancelled after running for an hour") - # except Exception as e: - # logging.error(f"Error in wallet_watch_loop: {str(e)}") - # await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") - - # logging.info("Restarting wallet_watch_loop") - # await telegram_utils.send_telegram_message("Restarting wallet_watch_loop") - - - -def run_asyncio_loop(loop): - asyncio.set_event_loop(loop) - loop.run_forever() async def run_all(): - main_task = asyncio.create_task(main()) - await main_task + await main() if __name__ == '__main__': - # Create a new event loop - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) + try: + asyncio.run(run_all()) + except Exception as e: + logging.error(f"An error occurred: {e}") - # Start the asyncio loop in a separate thread - thread = Thread(target=run_asyncio_loop, args=(loop,)) - thread.start() - - # Schedule the run_all coroutine in the event loop - asyncio.run_coroutine_threadsafe(run_all(), loop) - - # Run Uvicorn in the main thread + flask_app = init_app() uvicorn.run( - "app:asgi_app", # Replace 'app' with the actual name of this Python file if different + flask_app, host="127.0.0.1", port=3001, log_level="debug", reload=True ) - - # When Uvicorn exits, stop the asyncio loop - loop.call_soon_threadsafe(loop.stop) - thread.join() \ No newline at end of file diff --git a/crypto/sol/config.py b/crypto/sol/config.py index e708df8..eba0138 100644 --- a/crypto/sol/config.py +++ b/crypto/sol/config.py @@ -18,7 +18,14 @@ SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') BOT_NAME = os.getenv("BOT_NAME") +FOLLOW_AMOUNT = os.getenv('FOLLOW_AMOUNT', 'percentage') +SOLANA_ENDPOINTS = [ + "wss://api.mainnet-beta.solana.com", + # "wss://solana-api.projectserum.com", + # "wss://rpc.ankr.com/solana", + # "wss://mainnet.rpcpool.com", +] # Token addresses (initialize with some known tokens) TOKEN_ADDRESSES = { "SOL": "So11111111111111111111111111111111111111112", diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 84c31f5..1328d71 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -4,6 +4,28 @@ import os import aiohttp sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from solders import message +from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA +from dexscreener import DexscreenerClient +from solana.rpc.types import TokenAccountOpts, TxOpts +from solana.rpc.async_api import AsyncClient +from solana.transaction import Signature +from solana.rpc.websocket_api import connect +from solana.rpc.commitment import Confirmed, Processed +from solana.transaction import Transaction +from spl.token.client import Token +from base64 import b64decode +import base58 +from solders.rpc.requests import GetTransaction +from solders.signature import Signature +from solders.pubkey import Pubkey +from solders.keypair import Keypair +from solders.transaction import VersionedTransaction +from solders.transaction import Transaction +from solders.message import Message +from solders.instruction import Instruction +from solders.hash import Hash +from solders.instruction import CompiledInstruction import asyncio import json import logging @@ -14,20 +36,37 @@ import requests from datetime import datetime from solana.rpc.types import TokenAccountOpts, TxOpts + +# # # solders/solana libs (solana_client) # # # +from spl.token._layouts import MINT_LAYOUT +from solana.rpc.api import Client, Pubkey +from spl.token.async_client import AsyncToken + +from spl.token.constants import TOKEN_PROGRAM_ID +from borsh_construct import String, CStruct + +# ------------------ logger = logging.getLogger(__name__) -SOLANA_ENDPOINTS = [ - "wss://api.mainnet-beta.solana.com", -] + PING_INTERVAL = 30 -SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 1 minute +SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute from config import ( - FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY + FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS ) from modules.utils import telegram_utils +# Use the production Solana RPC endpoint +solana_client = AsyncClient(SOLANA_HTTP_URL) +dexscreener_client = DexscreenerClient() + + + + + + class SolanaWS: def __init__(self, on_message: Optional[callable] = None): self.websocket = None @@ -149,7 +188,7 @@ class SolanaWS: return None class SolanaAPI: - def __init__(self, process_transaction_callback, on_initial_subscription_callback = None, on_bot_message=None): + def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None): self.process_transaction = process_transaction_callback self.on_initial_subscription = on_initial_subscription_callback self.on_bot_message = on_bot_message, @@ -162,6 +201,7 @@ class SolanaAPI: message = await solana_ws.message_queue.get() await self.process_transaction(message) + _first_subscription = True async def wallet_watch_loop(self): solana_ws = SolanaWS(on_message=self.process_transaction) @@ -399,8 +439,107 @@ class SolanaAPI: except requests.exceptions.RequestException as e: print("Error fetching transaction details:", e) - + + + async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, max_retries = 16): + # wait for the transaction to be confirmed + # await async_client.wait_for_confirmation(Signature.from_string(transaction_id)) + # query every 5 seconds for the transaction details until not None or 30 seconds + for _ in range(max_retries): + try: + tx_details = await self.get_transaction_details_rpc(transaction_id) + if tx_details is not None: + break + except Exception as e: + logging.error(f"Error fetching transaction details: {e}") + retry_delay = retry_delay * 1.2 + logging.info(f"({_} of {max_retries}) Waiting for transaction details for {transaction_id}. retry in {retry_delay} s.") + await asyncio.sleep(retry_delay) + retry_delay *= 1.2 + return tx_details + + + async def get_swap_transaction_details(tx_signature_str): + t = await self.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0) + try: + parsed_result = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + + instructions = t.value.transaction.transaction.message.instructions + # Parse the swap instruction to extract token addresses, amounts, and types + for instruction in instructions: + if isinstance(instruction, CompiledInstruction): + if instruction.program_id == Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"): + parsed_info = instruction.parsed.info + mint = parsed_info["mint"] + amount = float(parsed_info["tokenAmount"]["amount"]) / (10 ** parsed_info["tokenAmount"]["decimals"]) + # Determine token in and token out based on balances + if parsed_result["token_in"] is None and amount > 0: + parsed_result["token_in"] = mint + parsed_result["amount_in"] = amount + elif parsed_result["token_out"] is None: + parsed_result["token_out"] = mint + parsed_result["amount_out"] = amount + + # Calculate percentage swapped + if parsed_result["amount_in"] > 0 and parsed_result["amount_out"] > 0: + parsed_result["percentage_swapped"] = (parsed_result["amount_out"] / parsed_result["amount_in"]) * 100 + + return parsed_result + + except Exception as e: + logging.error(f"Error fetching transaction details: {e}") + + return None + + + async def get_token_balance_rpc(wallet_address, token_address): + try: + accounts = await self.solana_ws.solana_jsonrpc("getTokenAccountsByOwner", [ + wallet_address, + { + "mint": token_address + }]) + + + if accounts['value']: + first_account = accounts['value'][0]['pubkey'] + balance_data = { + "jsonrpc": "2.0", + "id": 1, + "method": "getTokenAccountBalance", + "params": [ + first_account + ] + } + balance = self.solana_ws.solana_jsonrpc("getTokenAccountBalance", first_account) + + + + if 'value' in balance: + amount = float(balance['value']['uiAmount']) + logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}") + return amount + else: + logging.debug(f"No balance found for {token_address} in {wallet_address}") + return 0 + else: + logging.debug(f"No account found for {token_address} in {wallet_address}") + return 0 + except requests.exceptions.RequestException as e: + logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") + return 0 + + @@ -585,6 +724,7 @@ class SolanaDEX: sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address return await get_token_prices([sol_address]).get(sol_address, 0.0) + async def get_wallet_balances(wallet_address, doGetTokenName=True): balances = {} logging.info(f"Getting balances for wallet: {wallet_address}") @@ -716,4 +856,10 @@ class SolanaDEX: # save token info to file await save_token_info() + +async def save_token_info(): + with open('./logs/token_info.json', 'w') as f: + json.dump(TOKENS_INFO, f, indent=2) + +SAPI = SolanaAPI() \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index 8c8edaf..cc8ff4d 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -33,7 +33,7 @@ class TelegramUtils: await self.initialize() try: - # await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) logging.info(f"Telegram message sent: {message}") except Exception as e: logging.error(f"Error sending Telegram message: {str(e)}") @@ -42,10 +42,7 @@ class TelegramUtils: if self.conn_pool: await self.conn_pool.close() - -class Log: - # Set up success logger for accounting CSV - class CSVFormatter(logging.Formatter): +class CSVFormatter(logging.Formatter): def __init__(self): super().__init__() self.output = None @@ -66,7 +63,9 @@ class Log: record.wallet_address ]) return '' - +class Log: + # Set up success logger for accounting CSV + def __init__(self): logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) @@ -85,7 +84,7 @@ class Log: error_logger.addHandler(error_file_handler) success_log_file = os.path.join(log_dir, 'successful_swaps.csv') success_file_handler = RotatingFileHandler(success_log_file, maxBytes=10*1024*1024, backupCount=5) - success_file_handler.setFormatter(self.CSVFormatter()) + success_file_handler.setFormatter(CSVFormatter()) success_logger_accounting_csv = logging.getLogger('success_logger_accounting_csv') success_logger_accounting_csv.setLevel(logging.INFO) success_logger_accounting_csv.addHandler(success_file_handler) @@ -104,9 +103,39 @@ class Log: }) +def safe_get_property(info, property_name, default='Unknown'): + if not isinstance(info, dict): + return str(default) + value = info.get(property_name, default) + return str(value) if value is not None else str(default) + # Create a global instance of TelegramUtils telegram_utils = TelegramUtils() log = Log() -# You can add more Telegram-related methods to the TelegramUtils class if needed \ No newline at end of file +# You can add more Telegram-related methods to the TelegramUtils class if needed + + + +pk = os.getenv("PK") +async def get_pk(): + global pk + if not pk: + try: + script_dir = os.path.dirname(os.path.abspath(__file__)) + with open(os.path.join(script_dir, 'secret.pk'), 'r') as f: + pk = f.read().strip() + if pk: + logging.info("Private key loaded successfully from file.") + else: + logging.warning("Private key file is empty.") + except FileNotFoundError: + logging.warning("Private key file not found.") + except Exception as e: + logging.error(f"Error reading private key file: {str(e)}") + if not pk: + logging.error("Private key not found in environment variables. Will not be able to sign transactions.") + # send TG warning message + await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") + return pk \ No newline at end of file diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index b83b944..bc6b9ff 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -2,9 +2,10 @@ from flask import Flask, jsonify, request, render_template, redirect, url_for # from flask_oauthlib.client import OAuth from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user import secrets -from modules import storage +from modules import storage, utils, SolanaAPI import os + def init_app(): app = Flask(__name__, template_folder='../templates', static_folder='../static') app.config['SECRET_KEY'] = 'your-secret-key' @@ -117,11 +118,11 @@ def init_app(): return jsonify({"error": "No log files found"}), 404 try: - logger.info(f"Processing latest log file: {latest_log_file}") + utils.log.info(f"Processing latest log file: {latest_log_file}") with open(latest_log_file, 'r') as f: log = json.load(f) - result = await process_log(log) + result = await SolanaAPI.process_log(log) return jsonify({ "file": latest_log_file, @@ -130,7 +131,7 @@ def init_app(): }), 200 except Exception as e: - logging.error(f"Error processing log dump: {e}") + utils.log.error(f"Error processing log dump: {e}") return jsonify({"error": "Failed to process log"}), 500 @@ -141,7 +142,7 @@ def init_app(): @app.route('/tr//', methods=['GET', 'POST']) async def transaction_notified(wallet, tx_signature): try: - logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") + utils.log.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}") # Process the transaction # tr = await get_swap_transaction_details(tx_signature) tr = await get_transaction_details_info(tx_signature, []) @@ -152,7 +153,7 @@ def init_app(): await save_token_info() return jsonify(tr), 200 except Exception as e: - logging.error(f"Error processing transaction: {e}") + utils.log.error(f"Error processing transaction: {e}") return jsonify({"error": "Failed to process transaction"}), 500 @@ -174,5 +175,5 @@ def get_latest_log_file(): latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x))) return os.path.join(log_dir, latest_file) except Exception as e: - logging.error(f"Error fetching latest log file: {e}") + utils.log.error(f"Error fetching latest log file: {e}") return None