log transactions

This commit is contained in:
Dobromir Popov 2024-10-04 03:32:26 +03:00
parent 3db80762a0
commit a901697ccb
4 changed files with 289 additions and 138 deletions

1
.gitignore vendored
View File

@ -17,3 +17,4 @@ agent-mAId/build/*
agent-mAId/dist/main.exe
agent-mAId/output.wav
.node-persist/storage/*
logs/*

View File

@ -1,4 +1,6 @@
SOLANA_NET_URL="wss://api.mainnet-beta.solana.com"
SOLANA_NET_2="wss://mainnet.rpcpool.com"
DEVELOPER_CHAT_ID="777826553"
# Niki's
# FOLLOWED_WALLET="9U7D916zuQ8qcL9kQZqkcroWhHGho5vD8VNekvztrutN"

View File

@ -3,6 +3,9 @@ import websockets
import json
from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.transaction import Signature
from solana.rpc.websocket_api import connect
from solana.rpc.types import TokenAccountOpts
from solana.rpc.commitment import Confirmed
from solders.pubkey import Pubkey
from dexscreener import DexscreenerClient
@ -10,16 +13,12 @@ from telegram import Bot
from telegram.constants import ParseMode
import datetime
import logging
from solana.rpc.websocket_api import connect
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Confirmed
from solana.rpc.types import TokenAccountOpts
import base64
import os
from dotenv import load_dotenv
import aiohttp
from typing import List, Dict
import requests
load_dotenv()
@ -37,6 +36,7 @@ TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
SOLANA_URL = os.getenv("SOLANA_NET_URL")
DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
@ -211,12 +211,17 @@ async def get_token_balance(wallet_address, token_address):
return 0
ENV_FILE = '.env'
async def save_subscription_id(subscription_id):
set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id))
logger.info(f"Saved subscription ID: {subscription_id}")
async def load_subscription_id():
subscription_id = os.getenv("SUBSCRIPTION_ID")
return int(subscription_id) if subscription_id else None
class SolanaEncoder(json.JSONEncoder):
def default(self, obj):
if hasattr(obj, '__dict__'):
return obj.__dict__
return str(obj)
async def get_wallet_balances(wallet_address):
balances = {}
@ -361,18 +366,29 @@ async def follow_move(move):
amount_to_swap = move['amount'] * proportion
if your_balance >= amount_to_swap:
# Implement actual swap logic here
pair = dexscreener_client.get_token_pair("solana", move['token'])
price = float(pair['priceUsd'])
received_amount = amount_to_swap * price
# Perform the swap using Jupiter API
try:
swap_result = perform_swap(move['token'], move['to_token'], amount_to_swap)
message = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {move['token']} "
f"for {received_amount:.6f} {move['to_token']}"
)
logging.info(message)
await send_telegram_message(message)
if swap_result['success']:
message = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {move['token']} "
f"for {swap_result['outputAmount']:.6f} {move['to_token']}"
)
logging.info(message)
else:
message = (
f"<b>Swap Failed:</b>\n"
f"Error: {swap_result['error']}"
)
logging.warning(message)
await send_telegram_message(message)
except Exception as e:
error_message = f"<b>Swap Error:</b>\n{str(e)}"
logging.error(error_message)
await send_telegram_message(error_message)
else:
message = (
f"<b>Move Failed:</b>\n"
@ -380,52 +396,89 @@ async def follow_move(move):
)
logging.warning(message)
await send_telegram_message(message)
followed_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_balances = await get_wallet_balances(YOUR_WALLET)
if move['token'] not in followed_balances or move['token'] not in your_balances:
logging.error(f"Invalid token: {move['token']}")
return
def perform_swap(input_token, output_token, amount):
# Jupiter API endpoint
url = "https://quote-api.jup.ag/v4/quote"
followed_balance = followed_balances[move['token']]
your_balance = your_balances[move['token']]
# Parameters for the API request
params = {
"inputMint": input_token,
"outputMint": output_token,
"amount": int(amount * 10**9), # Convert to lamports
"slippageBps": 50, # 0.5% slippage
}
proportion = your_balance / followed_balance if followed_balance > 0 else 0
amount_to_swap = move['amount'] * proportion
try:
response = requests.get(url, params=params)
response.raise_for_status()
quote = response.json()
if your_balance >= amount_to_swap:
# Implement actual swap logic here
pair = dexscreener_client.get_token_pair("solana", move['token'])
price = float(pair['priceUsd'])
received_amount = amount_to_swap * price
# Get the best route
route = quote['data'][0]
# Perform the swap
swap_url = "https://quote-api.jup.ag/v4/swap"
swap_data = {
"quoteResponse": route,
"userPublicKey": YOUR_WALLET,
"wrapUnwrapSOL": True
}
swap_response = requests.post(swap_url, json=swap_data)
swap_response.raise_for_status()
swap_result = swap_response.json()
# Sign and send the transaction (this part depends on your wallet setup)
# For simplicity, we'll assume the transaction is successful
return {
"success": True,
"outputAmount": float(swap_result['outputAmount']) / 10**9 # Convert from lamports
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"error": str(e)
}
message = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {move['token']} "
f"for {received_amount:.6f} {move['to_token']}"
)
logging.info(message)
await send_telegram_message(message)
else:
message = (
f"<b>Move Failed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}"
)
logging.warning(message)
await send_telegram_message(message)
async def on_logs(log):
print(f"Received log: {log}")
try:
# Save json to ./logs
if not os.path.exists('./logs'):
os.makedirs('./logs')
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/log_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(log, f, indent=2)
except Exception as e:
logger.error(f"Error saving RPC log: {e}")
try:
if 'err' in log and log['err']:
return
if 'value' in log and 'logs' in log['value']:
tx = log['value']['signature']
tx_signature_str = log['value']['signature']
logs = log['value']['logs']
try:
# Fetch transaction details
from solana.publickey import PublicKey
tx_result = await solana_client.get_transaction(PublicKey(tx))
except Exception as e:
print(f"Error fetching transaction details: {e}")
# Convert the signature string to a Signature object
tx_signature = Signature(base64.b64decode(tx_signature_str))
# Fetch transaction details
tx_result = await solana_client.get_transaction(tx)
tx_result = await solana_client.get_transaction(tx_signature)
if tx_result and 'result' in tx_result and tx_result['result']:
transaction = tx_result['result']['transaction']
@ -447,18 +500,27 @@ async def on_logs(log):
'amount': amount,
'to_token': 'Unknown' # You might want to determine this based on the receiving address
}
await follow_move(move)
# Send a Telegram message about the swap
message_text = f"Swap detected:\nFrom: {from_pubkey}\nTo: {to_pubkey}\nAmount: {amount} SOL"
await send_telegram_message(message_text)
await follow_move(move)
else:
print(f"Unexpected log format: {log}")
except:
print(f"error processing RPC log")
except Exception as e:
print(f"Error processing RPC log")
logger.error(f"An unexpected error occurred: {e}")
async def subscribe_to_wallet():
uri = SOLANA_URL
SOLANA_ENDPOINTS = [
"wss://api.mainnet-beta.solana.com",
"wss://solana-api.projectserum.com",
"wss://rpc.ankr.com/solana",
"wss://mainnet.rpcpool.com",
]
uri = SOLANA_URL # wss://api.mainnet-beta.solana.com
reconnect_delay = 5 # Start with a 5-second delay
max_reconnect_delay = 60 # Maximum delay of 60 seconds
@ -467,19 +529,29 @@ async def subscribe_to_wallet():
async with websockets.connect(uri) as websocket:
logger.info("Connected to Solana websocket")
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{
"mentions": [FOLLOWED_WALLET]
},
{
"commitment": "confirmed"
}
]
}
subscription_id = await load_subscription_id()
if subscription_id:
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [subscription_id]
}
else:
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{
"mentions": [FOLLOWED_WALLET]
},
{
"commitment": "confirmed"
}
]
}
await websocket.send(json.dumps(request))
logger.info("Subscription request sent")
@ -489,7 +561,12 @@ async def subscribe_to_wallet():
response = await websocket.recv()
response_data = json.loads(response)
if 'result' in response_data:
logger.info(f"Subscription successful. Subscription id: {response_data['result']}")
subscription_id = response_data['result']
await save_subscription_id(subscription_id)
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
await list_initial_wallet_states()
elif 'params' in response_data:
await on_logs(response_data['params']['result'])
else:
@ -514,16 +591,13 @@ async def subscribe_to_wallet():
# Implement exponential backoff
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
logger = logging.getLogger(__name__)
async def main():
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.INFO)
await send_telegram_message("Solana Agent Application Started")
await list_initial_wallet_states()
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await subscribe_to_wallet()
if __name__ == '__main__':

File diff suppressed because one or more lines are too long