From c952edb36366ff5733b30258483568c84c5a8d59 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Fri, 4 Oct 2024 13:09:32 +0300 Subject: [PATCH] store transacrtions for replay --- .env | 3 +- crypto/sol/app.py | 169 ++++++++++++++++++++++++------------------- crypto/sol/readme.md | 1 + 3 files changed, 96 insertions(+), 77 deletions(-) diff --git a/.env b/.env index 7e9f218..044dfd2 100644 --- a/.env +++ b/.env @@ -27,4 +27,5 @@ OPENAI_API_KEY=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 # aider --model groq/llama3-70b-8192 # List models available from Groq -# aider --models groq/ \ No newline at end of file +# aider --models groq/ +SUBSCRIPTION_ID='1518430' diff --git a/crypto/sol/app.py b/crypto/sol/app.py index cb3eff9..4d58780 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -15,7 +15,7 @@ import datetime import logging import base64 import os -from dotenv import load_dotenv +from dotenv import load_dotenv,set_key import aiohttp from typing import List, Dict import requests @@ -24,6 +24,38 @@ import requests load_dotenv() app = Flask(__name__) +# Function to find the latest log file +def get_latest_log_file(): + log_dir = './logs' + try: + files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))] + latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x))) + return os.path.join(log_dir, latest_file) + except Exception as e: + logging.error(f"Error fetching latest log file: {e}") + return None + +# Flask route to retry processing the last log +@app.route('/retry-last-log', methods=['GET']) +def retry_last_log(): + latest_log_file = get_latest_log_file() + if not latest_log_file: + return jsonify({"error": "No log files found"}), 404 + + try: + with open(latest_log_file, 'r') as f: + log = json.load(f) + + # Run the asynchronous process_log function + asyncio.run(process_log(log)) + return jsonify({"status": "Log processed successfully"}), 200 + + except Exception as e: + logging.error(f"Error processing log: {e}") + return jsonify({"error": "Failed to process log"}), 500 + + + # Use the production Solana RPC endpoint solana_client = AsyncClient("https://api.mainnet-beta.solana.com") dexscreener_client = DexscreenerClient() @@ -442,75 +474,64 @@ def perform_swap(input_token, output_token, amount): "error": str(e) } +from solders.pubkey import Pubkey +from solders.transaction import Transaction +from solders.signature import Signature -async def on_logs(log): - print(f"Received log: {log}") + + +async def save_log(log): try: - # Save json to ./logs - if not os.path.exists('./logs'): - os.makedirs('./logs') - + os.makedirs('./logs', exist_ok=True) timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") filename = f"./logs/log_{timestamp}.json" with open(filename, 'w') as f: json.dump(log, f, indent=2) except Exception as e: - logger.error(f"Error saving RPC log: {e}") + logging.error(f"Error saving RPC log: {e}") - try: - if 'err' in log and log['err']: - return +async def process_log(log_result): + if log_result['value']['err']: + return - if 'value' in log and 'logs' in log['value']: - tx_signature_str = log['value']['signature'] - logs = log['value']['logs'] + tx_signature_str = log_result['value']['signature'] + logs = log_result['value']['logs'] - try: - # Fetch transaction details - from solana.publickey import PublicKey - tx_result = await solana_client.get_transaction(PublicKey(tx)) - except Exception as e: - print(f"Error fetching transaction details: {e}") - + try: + # Convert the base58 signature string to bytes + tx_signature = Signature(b58decode(tx_signature_str)) - # Convert the signature string to a Signature object - tx_signature = Signature(base64.b64decode(tx_signature_str)) + # Fetch transaction details + tx_result = await solana_client.get_transaction(tx_signature) + if tx_result and tx_result.value: + transaction = Transaction.from_json(tx_result.value) + message = transaction.message - # Fetch transaction details - tx_result = await solana_client.get_transaction(tx_signature) + for log_entry in logs: + if 'Program log: Instruction: Swap' in log_entry: + for instruction in message.instructions: + if instruction.program_id == TOKEN_ADDRESSES['SOL']: + from_pubkey = instruction.accounts[0] + to_pubkey = instruction.accounts[1] + amount = int(instruction.data, 16) / 1e9 - if tx_result and 'result' in tx_result and tx_result['result']: - transaction = tx_result['result']['transaction'] - message = transaction['message'] - - for log_entry in logs: - if 'Program log: Instruction: Swap' in log_entry: - # Handle swap event - for instruction in message['instructions']: - if instruction['programId'] == TOKEN_ADDRESSES['SOL']: - # This is a token transfer - from_pubkey = instruction['accounts'][0] - to_pubkey = instruction['accounts'][1] - amount = int(instruction['data'], 16) / 1e9 # Convert lamports to SOL - - if from_pubkey == FOLLOWED_WALLET: - move = { - 'token': 'SOL', - 'amount': amount, - 'to_token': 'Unknown' # You might want to determine this based on the receiving address - } - # Send a Telegram message about the swap - message_text = f"Swap detected:\nFrom: {from_pubkey}\nTo: {to_pubkey}\nAmount: {amount} SOL" - await send_telegram_message(message_text) - await follow_move(move) - else: - print(f"Unexpected log format: {log}") + if from_pubkey == FOLLOWED_WALLET: + move = { + 'token': 'SOL', + 'amount': amount, + 'to_token': 'Unknown' + } + message_text = f"Swap detected:\nFrom: {from_pubkey}\nTo: {to_pubkey}\nAmount: {amount} SOL" + await send_telegram_message(message_text) + await follow_move(move) except Exception as e: - print(f"Error processing RPC log") - logger.error(f"An unexpected error occurred: {e}") - + logging.error(f"Error processing log: {e}") +async def on_logs(log): + logging.debug(f"Received log: {log}") + await save_log(log) + await process_log(log) async def subscribe_to_wallet(): @@ -531,27 +552,20 @@ async def subscribe_to_wallet(): subscription_id = await load_subscription_id() - if subscription_id: - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [subscription_id] - } - else: - request = { - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - { - "mentions": [FOLLOWED_WALLET] - }, - { - "commitment": "confirmed" - } - ] - } + + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + { + "mentions": [FOLLOWED_WALLET] + }, + { + "commitment": "confirmed" + } + ] + } await websocket.send(json.dumps(request)) logger.info("Subscription request sent") @@ -560,6 +574,7 @@ async def subscribe_to_wallet(): try: response = await websocket.recv() response_data = json.loads(response) + logger.debug(f"Received response: {response_data}") if 'result' in response_data: subscription_id = response_data['result'] await save_subscription_id(subscription_id) @@ -601,4 +616,6 @@ async def main(): await subscribe_to_wallet() if __name__ == '__main__': - asyncio.run(main()) \ No newline at end of file + + app.run(debug=True,port=3001) + asyncio.run(main()) diff --git a/crypto/sol/readme.md b/crypto/sol/readme.md index 6aaad30..d4c989d 100644 --- a/crypto/sol/readme.md +++ b/crypto/sol/readme.md @@ -4,6 +4,7 @@ To run this Python Solana agent: Install the required libraries: `pip install flask solana dexscreener python-telegram-bot asyncio base58 aiohttp` +pip install flask dexscreener python-telegram-bot aiohttp requests dotenv websockets solders solana Replace REPLACE_WITH_WALLET_ADDRESS with the wallet address you want to follow. Replace REPLACE_WITH_YOUR_WALLET_ADDRESS with your own wallet address.