store transacrtions for replay

This commit is contained in:
Dobromir Popov 2024-10-04 13:09:32 +03:00
parent a901697ccb
commit c952edb363
3 changed files with 96 additions and 77 deletions

1
.env
View File

@ -28,3 +28,4 @@ OPENAI_API_KEY=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
# List models available from Groq # List models available from Groq
# aider --models groq/ # aider --models groq/
SUBSCRIPTION_ID='1518430'

View File

@ -15,7 +15,7 @@ import datetime
import logging import logging
import base64 import base64
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv,set_key
import aiohttp import aiohttp
from typing import List, Dict from typing import List, Dict
import requests import requests
@ -24,6 +24,38 @@ import requests
load_dotenv() load_dotenv()
app = Flask(__name__) app = Flask(__name__)
# Function to find the latest log file
def get_latest_log_file():
log_dir = './logs'
try:
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x)))
return os.path.join(log_dir, latest_file)
except Exception as e:
logging.error(f"Error fetching latest log file: {e}")
return None
# Flask route to retry processing the last log
@app.route('/retry-last-log', methods=['GET'])
def retry_last_log():
latest_log_file = get_latest_log_file()
if not latest_log_file:
return jsonify({"error": "No log files found"}), 404
try:
with open(latest_log_file, 'r') as f:
log = json.load(f)
# Run the asynchronous process_log function
asyncio.run(process_log(log))
return jsonify({"status": "Log processed successfully"}), 200
except Exception as e:
logging.error(f"Error processing log: {e}")
return jsonify({"error": "Failed to process log"}), 500
# Use the production Solana RPC endpoint # Use the production Solana RPC endpoint
solana_client = AsyncClient("https://api.mainnet-beta.solana.com") solana_client = AsyncClient("https://api.mainnet-beta.solana.com")
dexscreener_client = DexscreenerClient() dexscreener_client = DexscreenerClient()
@ -442,75 +474,64 @@ def perform_swap(input_token, output_token, amount):
"error": str(e) "error": str(e)
} }
from solders.pubkey import Pubkey
from solders.transaction import Transaction
from solders.signature import Signature
async def on_logs(log):
print(f"Received log: {log}")
async def save_log(log):
try: try:
# Save json to ./logs os.makedirs('./logs', exist_ok=True)
if not os.path.exists('./logs'):
os.makedirs('./logs')
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/log_{timestamp}.json" filename = f"./logs/log_{timestamp}.json"
with open(filename, 'w') as f: with open(filename, 'w') as f:
json.dump(log, f, indent=2) json.dump(log, f, indent=2)
except Exception as e: except Exception as e:
logger.error(f"Error saving RPC log: {e}") logging.error(f"Error saving RPC log: {e}")
try: async def process_log(log_result):
if 'err' in log and log['err']: if log_result['value']['err']:
return return
if 'value' in log and 'logs' in log['value']: tx_signature_str = log_result['value']['signature']
tx_signature_str = log['value']['signature'] logs = log_result['value']['logs']
logs = log['value']['logs']
try: try:
# Fetch transaction details # Convert the base58 signature string to bytes
from solana.publickey import PublicKey tx_signature = Signature(b58decode(tx_signature_str))
tx_result = await solana_client.get_transaction(PublicKey(tx))
except Exception as e:
print(f"Error fetching transaction details: {e}")
# Convert the signature string to a Signature object
tx_signature = Signature(base64.b64decode(tx_signature_str))
# Fetch transaction details # Fetch transaction details
tx_result = await solana_client.get_transaction(tx_signature) tx_result = await solana_client.get_transaction(tx_signature)
if tx_result and tx_result.value:
if tx_result and 'result' in tx_result and tx_result['result']: transaction = Transaction.from_json(tx_result.value)
transaction = tx_result['result']['transaction'] message = transaction.message
message = transaction['message']
for log_entry in logs: for log_entry in logs:
if 'Program log: Instruction: Swap' in log_entry: if 'Program log: Instruction: Swap' in log_entry:
# Handle swap event for instruction in message.instructions:
for instruction in message['instructions']: if instruction.program_id == TOKEN_ADDRESSES['SOL']:
if instruction['programId'] == TOKEN_ADDRESSES['SOL']: from_pubkey = instruction.accounts[0]
# This is a token transfer to_pubkey = instruction.accounts[1]
from_pubkey = instruction['accounts'][0] amount = int(instruction.data, 16) / 1e9
to_pubkey = instruction['accounts'][1]
amount = int(instruction['data'], 16) / 1e9 # Convert lamports to SOL
if from_pubkey == FOLLOWED_WALLET: if from_pubkey == FOLLOWED_WALLET:
move = { move = {
'token': 'SOL', 'token': 'SOL',
'amount': amount, 'amount': amount,
'to_token': 'Unknown' # You might want to determine this based on the receiving address 'to_token': 'Unknown'
} }
# Send a Telegram message about the swap
message_text = f"Swap detected:\nFrom: {from_pubkey}\nTo: {to_pubkey}\nAmount: {amount} SOL" message_text = f"Swap detected:\nFrom: {from_pubkey}\nTo: {to_pubkey}\nAmount: {amount} SOL"
await send_telegram_message(message_text) await send_telegram_message(message_text)
await follow_move(move) await follow_move(move)
else:
print(f"Unexpected log format: {log}")
except Exception as e: except Exception as e:
print(f"Error processing RPC log") logging.error(f"Error processing log: {e}")
logger.error(f"An unexpected error occurred: {e}")
async def on_logs(log):
logging.debug(f"Received log: {log}")
await save_log(log)
await process_log(log)
async def subscribe_to_wallet(): async def subscribe_to_wallet():
@ -531,14 +552,7 @@ async def subscribe_to_wallet():
subscription_id = await load_subscription_id() subscription_id = await load_subscription_id()
if subscription_id:
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [subscription_id]
}
else:
request = { request = {
"jsonrpc": "2.0", "jsonrpc": "2.0",
"id": 1, "id": 1,
@ -560,6 +574,7 @@ async def subscribe_to_wallet():
try: try:
response = await websocket.recv() response = await websocket.recv()
response_data = json.loads(response) response_data = json.loads(response)
logger.debug(f"Received response: {response_data}")
if 'result' in response_data: if 'result' in response_data:
subscription_id = response_data['result'] subscription_id = response_data['result']
await save_subscription_id(subscription_id) await save_subscription_id(subscription_id)
@ -601,4 +616,6 @@ async def main():
await subscribe_to_wallet() await subscribe_to_wallet()
if __name__ == '__main__': if __name__ == '__main__':
app.run(debug=True,port=3001)
asyncio.run(main()) asyncio.run(main())

View File

@ -4,6 +4,7 @@ To run this Python Solana agent:
Install the required libraries: Install the required libraries:
`pip install flask solana dexscreener python-telegram-bot asyncio base58 aiohttp` `pip install flask solana dexscreener python-telegram-bot asyncio base58 aiohttp`
pip install flask dexscreener python-telegram-bot aiohttp requests dotenv websockets solders solana
Replace REPLACE_WITH_WALLET_ADDRESS with the wallet address you want to follow. Replace REPLACE_WITH_WALLET_ADDRESS with the wallet address you want to follow.
Replace REPLACE_WITH_YOUR_WALLET_ADDRESS with your own wallet address. Replace REPLACE_WITH_YOUR_WALLET_ADDRESS with your own wallet address.