diff --git a/.env b/.env index 044dfd2..1f066e6 100644 --- a/.env +++ b/.env @@ -28,4 +28,4 @@ OPENAI_API_KEY=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 # List models available from Groq # aider --models groq/ -SUBSCRIPTION_ID='1518430' +SUBSCRIPTION_ID='2217755' diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 1d54feb..17747e2 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -19,6 +19,7 @@ from dotenv import load_dotenv,set_key import aiohttp from typing import List, Dict import requests +import threading load_dotenv() @@ -28,7 +29,10 @@ app = Flask(__name__) def get_latest_log_file(): log_dir = './logs' try: - files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] + # files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] + # filter files mask log_20241005_004103_143116.json + files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')] + latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x))) return os.path.join(log_dir, latest_file) except Exception as e: @@ -242,7 +246,7 @@ async def get_token_balance(wallet_address, token_address): logging.debug(f"No account found for {token_address} in {wallet_address}") return 0 except Exception as e: - logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)}") + logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}") return 0 @@ -498,15 +502,6 @@ async def get_transaction_details_rpc(tx_signature): } ] } - # request = { - # "jsonrpc": "2.0", - # "id": 1, - # "method": "getConfirmedTransaction", - # "params": [ - # tx_signature, - # "json" - # ] - # } try: response = requests.post(url, headers=headers, data=json.dumps(data)) response.raise_for_status() # Raises an error for bad responses @@ -533,6 +528,35 @@ async def save_log(log): logging.error(f"Error saving RPC log: {e}") +def determine_token(pubkey, watched_tokens): + # Check if the pubkey matches any watched token addresses + for token, address in watched_tokens.items(): + if pubkey == address: + return token + return "Unknown" + +def parse_amount_from_logs(logs): + amount_in = 0 + amount_out = 0 + + for log in logs: + if 'SwapEvent' in log: + # Extract amounts from the log line + parts = log.split('amount_in: ')[1].split(', amount_out: ') + amount_in = int(parts[0]) + amount_out = int(parts[1].split(' ')[0]) + + return amount_in, amount_out + +def extract_swap_details(instruction, logs, watched_tokens): + # Extract source and target tokens along with amounts + from_pubkey = instruction['accounts'][0] + to_pubkey = instruction['accounts'][1] + amount_in, amount_out = parse_amount_from_logs(logs) + return from_pubkey, to_pubkey, amount_in, amount_out + + + async def process_log(log_result): if log_result['value']['err']: return @@ -541,45 +565,96 @@ async def process_log(log_result): logs = log_result['value']['logs'] try: - - try: - transaction = await get_transaction_details_rpc(tx_signature_str) - except Exception as e: - logging.error(f"Error fetching transaction details: {e}") - return - - - - # Convert the base58 signature string to bytes - tx_signature = Signature(b58decode(tx_signature_str)) - # Fetch transaction details - tx_result = await solana_client.get_transaction(tx_signature, max_supported_transaction_version=0) - #tx_result = await get_transaction_details(tx_signature_str) - if tx_result.value is None: - logging.error(f"Transaction not found: {tx_signature_str}") - return - transaction = tx_result.value.transaction + # Detect swap operations in logs + swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2'] for log_entry in logs: - if 'Program log: Instruction: Swap' in log_entry: - for instruction in message.instructions: - if instruction.program_id == TOKEN_ADDRESSES['SOL']: - from_pubkey = instruction.accounts[0] - to_pubkey = instruction.accounts[1] - amount = int(instruction.data, 16) / 1e9 + if any(op in log_entry for op in swap_operations): + try: + # ++ OLD using solana-py + # # Convert the base58 signature string to bytes + # tx_signature = Signature(b58decode(tx_signature_str)) + # # Fetch transaction details + # tx_result = await solana_client.get_transaction(tx_signature, max_supported_transaction_version=0) + # #tx_result = await get_transaction_details(tx_signature_str) + # if tx_result.value is None: + # logging.error(f"Transaction not found: {tx_signature_str}") + # return + # transaction2 = tx_result.value.transaction + # -- OLD using solana-py + + watched_tokens = await get_non_zero_token_balances(FOLLOWED_WALLET) + details = parse_swap_logs(logs) + transaction = await get_transaction_details_rpc(tx_signature_str) - if from_pubkey == FOLLOWED_WALLET: + instructions = transaction['transaction']['message']['instructions'] + + for instruction in instructions: + from_pubkey, to_pubkey, amount_in, amount_out = extract_swap_details(instruction, logs, watched_tokens) + + if from_pubkey in watched_tokens.values() or to_pubkey in watched_tokens.values(): + from_token = determine_token(from_pubkey, watched_tokens) + to_token = determine_token(to_pubkey, watched_tokens) + move = { - 'token': 'SOL', - 'amount': amount, - 'to_token': 'Unknown' + 'token': from_token, + 'amount': amount_in, + 'to_token': to_token } - message_text = f"Swap detected:\nFrom: {from_pubkey}\nTo: {to_pubkey}\nAmount: {amount} SOL" + message_text = ( + f"Swap detected:\n" + f"From: {from_pubkey} ({from_token})\n" + f"To: {to_pubkey} ({to_token})\n" + f"Amount In: {amount_in}\n" + f"Amount Out: {amount_out}" + ) await send_telegram_message(message_text) await follow_move(move) + + + except Exception as e: + logging.error(f"Error fetching transaction details: {e}") + return + except Exception as e: logging.error(f"Error processing log: {e}") +def parse_swap_logs(logs): + swap_details = { + "source_token_address": "", + "destination_token_address": "", + "amount_in": 0, + "amount_out": 0 + } + + for log in logs: + if "SwapEvent" in log: + # Extract amounts from SwapEvent + parts = log.split("amount_in: ")[1].split(", amount_out: ") + swap_details["amount_in"] = int(parts[0]) + swap_details["amount_out"] = int(parts[1].split(" ")[0]) + + if "source_token_change:" in log: + # Extract source and destination token changes + changes = log.split(", ") + for change in changes: + key, value = change.split(": ") + if key == "source_token_change": + swap_details["amount_in"] = int(value) + elif key == "destination_token_change": + swap_details["amount_out"] = int(value) + + if "Program log:" in log and len(log.split()) == 2: + # Extract token addresses (assuming they are logged as single entries) + token_address = log.split(": ")[1] + if not swap_details["source_token_address"]: + swap_details["source_token_address"] = token_address + elif not swap_details["destination_token_address"]: + swap_details["destination_token_address"] = token_address + + return swap_details + + async def on_logs(log): logging.debug(f"Received log: {log}") await save_log(log) @@ -662,15 +737,28 @@ async def subscribe_to_wallet(): logger = logging.getLogger(__name__) + async def main(): # Initialize logging logging.basicConfig(level=logging.DEBUG) - # logging.basicConfig(level=logging.INFO) - await send_telegram_message("Solana Agent Started. Connecting to mainnet...") - #await subscribe_to_wallet() + await subscribe_to_wallet() + +def run_flask(): + # Run Flask app without the reloader, so we can run the async main function + app.run(debug=False, port=3001, use_reloader=False) if __name__ == '__main__': - - app.run(debug=True,port=3001) + # Start Flask in a separate thread + flask_thread = threading.Thread(target=run_flask) + flask_thread.start() + + # Create an event loop for the async tasks + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + # Start Flask in a separate thread + flask_thread = threading.Thread(target=run_flask) + flask_thread.start() + + # Run the async main function asyncio.run(main()) diff --git a/crypto/sol/transactionExample.json b/crypto/sol/transactionExample.json index 525ad4c..fbbc586 100644 --- a/crypto/sol/transactionExample.json +++ b/crypto/sol/transactionExample.json @@ -1,4 +1,7 @@ -{'blockTime': 1728035375, 'meta': {'computeUnitsConsumed': 128099, 'err': None, 'fee': 97127, 'innerInstructions': [ +{ + 'blockTime': 1728035375, +'meta': {'computeUnitsConsumed': 128099, +'err': None, 'fee': 97127, 'innerInstructions': [ {'index': 4, 'instructions': [ {'accounts': ['7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV', 'A1BBtTYJd4i3xU8D6Tc2FzU6ZN4oXZWXKZnCxwbHXr8x', '7LVHkVPVosXoWgH1PykWa5PjsfnyBviCdxC8ZUHJDy5U', '8Ggb8th8pZv7eJbRmwkoyrDbSvwLPvMQN7QuHwjm113z', 'icV5K6iC8J7yyeP9YnJLH2jPRJYT7dBmzm844STxHkP', 'BAxX7eMuSoxF8E4s1Xz5ukcLPHjvfC8Wv9JjSZci7tZ7', 'Gvjgjv63zcQxLbcG2iYF3vcJ2nudRLEffN6hoo8p6Ewy', 'HtsJ5S6K4NM2vXaWZ8k49JAggBgPGrryJ21ezdPBoUC6', 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA', 'TokenzQdBNbLqP5VEhdkAS6EPFLC1PHnBqCXEpPxuEb', 'MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr', 'Faf89929Ni9fbg4gmVZTca7eW6NFg877Jqn6MizT3Gvw', 'So11111111111111111111111111111111111111112', 'DeRo9XMsizey3KWBWS3LHomFC7ZDny1NkFMZUEHgyX8a', 'E6ef1fYRvZ8ejtbdwaK1CDNAgpJxkWZ2pQsL4jaShjNU', '423toqoYT7obTRx8qmwVAeYZfMFRxkwwDzRAwJBd86K3' ], 'data': 'ASCsAbe1UnDmvdhmjnwFdPMzcfkrayiNaFZ61j7FiXYFR5MbydQDnzNL', 'programId': 'CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK', 'stackHeight': 2