gogo2/crypto/sol/app.py
2024-10-05 19:04:36 +03:00

765 lines
29 KiB
Python

import asyncio
import websockets
import json
from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.transaction import Signature
from solana.rpc.websocket_api import connect
from solana.rpc.types import TokenAccountOpts, TxOpts
from solana.rpc.commitment import Confirmed
from solders.pubkey import Pubkey
from dexscreener import DexscreenerClient
from telegram import Bot
from telegram.constants import ParseMode
import datetime
import logging
import base64
import os
from dotenv import load_dotenv,set_key
import aiohttp
from typing import List, Dict
import requests
import threading
load_dotenv()
app = Flask(__name__)
# Function to find the latest log file
def get_latest_log_file():
log_dir = './logs'
try:
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
# filter files mask log_20241005_004103_143116.json
files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
latest_file = max(files, key=lambda x: os.path.getctime(os.path.join(log_dir, x)))
return os.path.join(log_dir, latest_file)
except Exception as e:
logging.error(f"Error fetching latest log file: {e}")
return None
# Flask route to retry processing the last log
@app.route('/retry-last-log', methods=['GET'])
def retry_last_log():
latest_log_file = get_latest_log_file()
if not latest_log_file:
return jsonify({"error": "No log files found"}), 404
try:
with open(latest_log_file, 'r') as f:
log = json.load(f)
# Run the asynchronous process_log function
asyncio.run(process_log(log))
return jsonify({"status": "Log processed successfully"}), 200
except Exception as e:
logging.error(f"Error processing log: {e}")
return jsonify({"error": "Failed to process log"}), 500
# Configuration
DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
YOUR_WALLET = os.getenv("YOUR_WALLET")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
SOLANA_WS_URL = os.getenv("SOLANA_WS_URL")
SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL")
DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD')
# Use the production Solana RPC endpoint
solana_client = AsyncClient(SOLANA_HTTP_URL)
dexscreener_client = DexscreenerClient()
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# Token addresses (initialize with some known tokens)
TOKEN_ADDRESSES = {
"SOL": "So11111111111111111111111111111111111111112",
"USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og"
}
async def send_telegram_message(message):
try:
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
logging.info(f"Telegram message sent: {message}")
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")
# async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
# url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
# params = {
# "contract_addresses": ",".join(token_addresses),
# "vs_currencies": DISPLAY_CURRENCY.lower()
# }
# prices = {}
# async with aiohttp.ClientSession() as session:
# async with session.get(url, params=params) as response:
# if response.status == 200:
# data = await response.json()
# for address, price_info in data.items():
# if DISPLAY_CURRENCY.lower() in price_info:
# prices[address] = price_info[DISPLAY_CURRENCY.lower()]
# else:
# logging.error(f"Failed to get token prices. Status: {response.status}")
# # For tokens not found in CoinGecko, try to get price from a DEX or set a default value
# missing_tokens = set(token_addresses) - set(prices.keys())
# for token in missing_tokens:
# # You might want to implement a fallback method here, such as:
# # prices[token] = await get_price_from_dex(token)
# # For now, we'll set a default value
# prices[token] = 0.0
# logging.warning(f"Price not found for token {token}. Setting to 0.")
# return prices
async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]:
coingecko_prices = await get_prices_from_coingecko(token_addresses)
# For tokens not found in CoinGecko, use DexScreener
missing_tokens = set(token_addresses) - set(coingecko_prices.keys())
if missing_tokens:
dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens))
coingecko_prices.update(dexscreener_prices)
# If any tokens are still missing, set their prices to 0
for token in set(token_addresses) - set(coingecko_prices.keys()):
coingecko_prices[token] = 0.0
logging.warning(f"Price not found for token {token}. Setting to 0.")
return coingecko_prices
async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]:
url = "https://api.coingecko.com/api/v3/simple/token_price/solana"
params = {
"contract_addresses": ",".join(token_addresses),
"vs_currencies": DISPLAY_CURRENCY.lower()
}
prices = {}
async with aiohttp.ClientSession() as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
for address, price_info in data.items():
if DISPLAY_CURRENCY.lower() in price_info:
prices[address] = price_info[DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get token prices from CoinGecko. Status: {response.status}")
return prices
async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]:
base_url = "https://api.dexscreener.com/latest/dex/tokens/"
prices = {}
async with aiohttp.ClientSession() as session:
tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses]
results = await asyncio.gather(*tasks)
for address, result in zip(token_addresses, results):
if result and 'pairs' in result and result['pairs']:
pair = result['pairs'][0] # Use the first pair (usually the most liquid)
prices[address] = float(pair['priceUsd'])
else:
logging.warning(f"No price data found on DexScreener for token {address}")
return prices
async def fetch_token_data(session, url):
try:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Failed to fetch data from {url}. Status: {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching data from {url}: {str(e)}")
return None
async def get_sol_price() -> float:
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['solana'][DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}")
return await get_sol_price_from_dexscreener()
async def get_sol_price_from_dexscreener() -> float:
sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address
prices = await get_prices_from_dexscreener([sol_address])
return prices.get(sol_address, 0.0)
async def get_sol_price() -> float:
url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data['solana'][DISPLAY_CURRENCY.lower()]
else:
logging.error(f"Failed to get SOL price. Status: {response.status}")
return None
async def convert_balances_to_currency(balances, token_prices, sol_price):
converted_balances = {}
for token, amount in balances.items():
if token == 'SOL':
converted_balances[token] = amount * sol_price
elif token in token_prices:
converted_balances[token] = amount * token_prices[token]
else:
converted_balances[token] = None # Price not available
logging.warning(f"Price not available for token {token}")
return converted_balances
async def get_token_balance(wallet_address, token_address):
try:
response = await solana_client.get_token_accounts_by_owner(
Pubkey.from_string(wallet_address),
{"mint": Pubkey.from_string(token_address)}
)
if response['result']['value']:
balance = await solana_client.get_token_account_balance(
response['result']['value'][0]['pubkey']
)
amount = float(balance['result']['value']['uiAmount'])
logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}")
return amount
else:
logging.debug(f"No account found for {token_address} in {wallet_address}")
return 0
except Exception as e:
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}")
return 0
ENV_FILE = '.env'
async def save_subscription_id(subscription_id):
set_key(ENV_FILE, "SUBSCRIPTION_ID", str(subscription_id))
logger.info(f"Saved subscription ID: {subscription_id}")
async def load_subscription_id():
subscription_id = os.getenv("SUBSCRIPTION_ID")
return int(subscription_id) if subscription_id else None
async def get_wallet_balances(wallet_address):
balances = {}
token_addresses = []
logging.info(f"Getting balances for wallet: {wallet_address}")
try:
response = await solana_client.get_token_accounts_by_owner_json_parsed(
Pubkey.from_string(wallet_address),
opts=TokenAccountOpts(
program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
),
commitment=Confirmed
)
if response.value:
for account in response.value:
parsed_data = account.account.data.parsed
if isinstance(parsed_data, dict) and 'info' in parsed_data:
info = parsed_data['info']
if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info:
mint = info['mint']
amount = float(info['tokenAmount']['uiAmount'])
if amount > 0:
token_addresses.append(mint)
balances[mint] = amount
logging.debug(f"Balance for {mint}: {amount}")
else:
logging.warning(f"Unexpected data format for account: {account}")
sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address))
if sol_balance.value is not None:
balances['SOL'] = sol_balance.value / 1e9
else:
logging.warning(f"SOL balance response missing for wallet: {wallet_address}")
except Exception as e:
logging.error(f"Error getting wallet balances: {str(e)}")
return balances, token_addresses
async def get_converted_balances(wallet_address):
balances, token_addresses = await get_wallet_balances(wallet_address)
token_prices = await get_token_prices(token_addresses)
sol_price = await get_sol_price()
converted_balances = await convert_balances_to_currency(balances, token_prices, sol_price)
return converted_balances
async def send_initial_wallet_states(followed_wallet, your_wallet):
followed_balances = await get_converted_balances(followed_wallet)
your_balances = await get_converted_balances(your_wallet)
message = f"Initial Wallet States (Non-zero balances in {DISPLAY_CURRENCY}):\n\n"
message += f"Followed Wallet ({followed_wallet}):\n"
for token, amount in followed_balances.items():
if amount and amount > 0:
message += f"{token}: {amount:.2f}\n"
message += f"\nYour Wallet ({your_wallet}):\n"
for token, amount in your_balances.items():
if amount and amount > 0:
message += f"{token}: {amount:.2f}\n"
message += "\nMonitored Tokens:\n"
# Add monitored tokens logic here if needed
await bot.send_message(chat_id=CHAT_ID, text=message)
async def get_non_zero_token_balances(wallet_address):
non_zero_balances = {}
logging.info(f"Getting non-zero balances for wallet: {wallet_address}")
for token, address in TOKEN_ADDRESSES.items():
balance = await get_token_balance(wallet_address, address)
if balance > 0:
non_zero_balances[token] = address
logging.debug(f"Non-zero balance for {token}: {balance}")
return non_zero_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE
followed_wallet_balances, followed_token_addresses = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances, your_token_addresses = await get_wallet_balances(YOUR_WALLET)
all_token_addresses = list(set(followed_token_addresses + your_token_addresses))
token_prices = await get_token_prices(all_token_addresses)
sol_price = await get_sol_price()
followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, token_prices, sol_price)
your_converted_balances = await convert_balances_to_currency(your_wallet_balances, token_prices, sol_price)
TOKEN_ADDRESSES = {token: amount for token, amount in {**followed_converted_balances, **your_converted_balances}.items() if amount is not None and amount > 0}
logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}")
followed_wallet_state = []
FOLLOWED_WALLET_VALUE = 0
for token, amount in followed_converted_balances.items():
if amount is not None and amount > 0:
followed_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
FOLLOWED_WALLET_VALUE += amount
your_wallet_state = []
YOUR_WALLET_VALUE = 0
for token, amount in your_converted_balances.items():
if amount is not None and amount > 0:
your_wallet_state.append(f"{token}: {amount:.2f} {DISPLAY_CURRENCY}")
YOUR_WALLET_VALUE += amount
message = (
f"<b>Initial Wallet States (All balances in {DISPLAY_CURRENCY}):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{chr(10).join(followed_wallet_state)}\n"
f"<b>Total Value:</b> {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{chr(10).join(your_wallet_state)}\n"
f"<b>Total Value:</b> {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join(TOKEN_ADDRESSES.keys())}"
)
logging.info(message)
await send_telegram_message(message)
async def follow_move(move):
followed_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_balances = await get_wallet_balances(YOUR_WALLET)
if move['token'] not in followed_balances or move['token'] not in your_balances:
logging.error(f"Invalid token: {move['token']}")
return
followed_balance = followed_balances[move['token']]
your_balance = your_balances[move['token']]
proportion = your_balance / followed_balance if followed_balance > 0 else 0
amount_to_swap = move['amount'] * proportion
if your_balance >= amount_to_swap:
# Perform the swap using Jupiter API
try:
swap_result = perform_swap(move['token'], move['to_token'], amount_to_swap)
if swap_result['success']:
message = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {move['token']} "
f"for {swap_result['outputAmount']:.6f} {move['to_token']}"
)
logging.info(message)
else:
message = (
f"<b>Swap Failed:</b>\n"
f"Error: {swap_result['error']}"
)
logging.warning(message)
await send_telegram_message(message)
except Exception as e:
error_message = f"<b>Swap Error:</b>\n{str(e)}"
logging.error(error_message)
await send_telegram_message(error_message)
else:
message = (
f"<b>Move Failed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}"
)
logging.warning(message)
await send_telegram_message(message)
def perform_swap(input_token, output_token, amount):
# Jupiter API endpoint
url = "https://quote-api.jup.ag/v4/quote"
# Parameters for the API request
params = {
"inputMint": input_token,
"outputMint": output_token,
"amount": int(amount * 10**9), # Convert to lamports
"slippageBps": 50, # 0.5% slippage
}
try:
response = requests.get(url, params=params)
response.raise_for_status()
quote = response.json()
# Get the best route
route = quote['data'][0]
# Perform the swap
swap_url = "https://quote-api.jup.ag/v4/swap"
swap_data = {
"quoteResponse": route,
"userPublicKey": YOUR_WALLET,
"wrapUnwrapSOL": True
}
swap_response = requests.post(swap_url, json=swap_data)
swap_response.raise_for_status()
swap_result = swap_response.json()
# Sign and send the transaction (this part depends on your wallet setup)
# For simplicity, we'll assume the transaction is successful
return {
"success": True,
"outputAmount": float(swap_result['outputAmount']) / 10**9 # Convert from lamports
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"error": str(e)
}
from base58 import b58decode
from solders.pubkey import Pubkey
from solders.transaction import Transaction
from solders.signature import Signature
async def get_transaction_details_rpc(tx_signature):
url = SOLANA_HTTP_URL
# url = 'https://solana.drpc.org'
headers = {"Content-Type": "application/json"}
data = {
"jsonrpc": "2.0",
"id": 1,
"method": "getTransaction",
"params": [
tx_signature,
{
"encoding": "jsonParsed",
"maxSupportedTransactionVersion": 0
}
]
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status() # Raises an error for bad responses
transaction_details = response.json()
if 'result' in transaction_details:
print(transaction_details['result'])
return transaction_details['result']
else:
print("Unexpected response:", transaction_details)
except requests.exceptions.RequestException as e:
print("Error fetching transaction details:", e)
async def save_log(log):
try:
os.makedirs('./logs', exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/log_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(log, f, indent=2)
except Exception as e:
logging.error(f"Error saving RPC log: {e}")
def determine_token(pubkey, watched_tokens):
# Check if the pubkey matches any watched token addresses
for token, address in watched_tokens.items():
if pubkey == address:
return token
return "Unknown"
def parse_amount_from_logs(logs):
amount_in = 0
amount_out = 0
for log in logs:
if 'SwapEvent' in log:
# Extract amounts from the log line
parts = log.split('amount_in: ')[1].split(', amount_out: ')
amount_in = int(parts[0])
amount_out = int(parts[1].split(' ')[0])
return amount_in, amount_out
def extract_swap_details(instruction, logs, watched_tokens):
# Extract source and target tokens along with amounts
from_pubkey = instruction['accounts'][0]
to_pubkey = instruction['accounts'][1]
amount_in, amount_out = parse_amount_from_logs(logs)
return from_pubkey, to_pubkey, amount_in, amount_out
async def process_log(log_result):
if log_result['value']['err']:
return
tx_signature_str = log_result['value']['signature']
logs = log_result['value']['logs']
try:
# Detect swap operations in logs
swap_operations = ['Program log: Instruction: Swap', 'Program log: Instruction: Swap2']
for log_entry in logs:
if any(op in log_entry for op in swap_operations):
try:
# ++ OLD using solana-py
# # Convert the base58 signature string to bytes
# tx_signature = Signature(b58decode(tx_signature_str))
# # Fetch transaction details
# tx_result = await solana_client.get_transaction(tx_signature, max_supported_transaction_version=0)
# #tx_result = await get_transaction_details(tx_signature_str)
# if tx_result.value is None:
# logging.error(f"Transaction not found: {tx_signature_str}")
# return
# transaction2 = tx_result.value.transaction
# -- OLD using solana-py
watched_tokens = await get_non_zero_token_balances(FOLLOWED_WALLET)
details = parse_swap_logs(logs)
transaction = await get_transaction_details_rpc(tx_signature_str)
instructions = transaction['transaction']['message']['instructions']
for instruction in instructions:
from_pubkey, to_pubkey, amount_in, amount_out = extract_swap_details(instruction, logs, watched_tokens)
if from_pubkey in watched_tokens.values() or to_pubkey in watched_tokens.values():
from_token = determine_token(from_pubkey, watched_tokens)
to_token = determine_token(to_pubkey, watched_tokens)
move = {
'token': from_token,
'amount': amount_in,
'to_token': to_token
}
message_text = (
f"Swap detected:\n"
f"From: {from_pubkey} ({from_token})\n"
f"To: {to_pubkey} ({to_token})\n"
f"Amount In: {amount_in}\n"
f"Amount Out: {amount_out}"
)
await send_telegram_message(message_text)
await follow_move(move)
except Exception as e:
logging.error(f"Error fetching transaction details: {e}")
return
except Exception as e:
logging.error(f"Error processing log: {e}")
def parse_swap_logs(logs):
swap_details = {
"source_token_address": "",
"destination_token_address": "",
"amount_in": 0,
"amount_out": 0
}
for log in logs:
if "SwapEvent" in log:
# Extract amounts from SwapEvent
parts = log.split("amount_in: ")[1].split(", amount_out: ")
swap_details["amount_in"] = int(parts[0])
swap_details["amount_out"] = int(parts[1].split(" ")[0])
if "source_token_change:" in log:
# Extract source and destination token changes
changes = log.split(", ")
for change in changes:
key, value = change.split(": ")
if key == "source_token_change":
swap_details["amount_in"] = int(value)
elif key == "destination_token_change":
swap_details["amount_out"] = int(value)
if "Program log:" in log and len(log.split()) == 2:
# Extract token addresses (assuming they are logged as single entries)
token_address = log.split(": ")[1]
if not swap_details["source_token_address"]:
swap_details["source_token_address"] = token_address
elif not swap_details["destination_token_address"]:
swap_details["destination_token_address"] = token_address
return swap_details
async def on_logs(log):
logging.debug(f"Received log: {log}")
await save_log(log)
await process_log(log)
async def subscribe_to_wallet():
SOLANA_ENDPOINTS = [
"wss://api.mainnet-beta.solana.com",
"wss://solana-api.projectserum.com",
"wss://rpc.ankr.com/solana",
"wss://mainnet.rpcpool.com",
]
uri = SOLANA_WS_URL # wss://api.mainnet-beta.solana.com
reconnect_delay = 5 # Start with a 5-second delay
max_reconnect_delay = 60 # Maximum delay of 60 seconds
while True:
try:
async with websockets.connect(uri) as websocket:
logger.info("Connected to Solana websocket")
subscription_id = await load_subscription_id()
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{
"mentions": [FOLLOWED_WALLET]
},
{
"commitment": "confirmed"
}
]
}
await websocket.send(json.dumps(request))
logger.info("Subscription request sent")
while True:
try:
response = await websocket.recv()
response_data = json.loads(response)
logger.debug(f"Received response: {response_data}")
if 'result' in response_data:
subscription_id = response_data['result']
await save_subscription_id(subscription_id)
logger.info(f"Subscription successful. Subscription id: {subscription_id}")
await send_telegram_message("Connected to Solana network. Watching for transactions now.")
await list_initial_wallet_states()
elif 'params' in response_data:
await on_logs(response_data['params']['result'])
else:
logger.warning(f"Unexpected response: {response}")
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"Connection closed unexpectedly: {e}")
break
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
break
except websockets.exceptions.WebSocketException as e:
logger.error(f"WebSocket error: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...")
await asyncio.sleep(reconnect_delay)
# Implement exponential backoff
reconnect_delay = min(reconnect_delay * 2, max_reconnect_delay)
logger = logging.getLogger(__name__)
async def main():
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
await send_telegram_message("Solana Agent Started. Connecting to mainnet...")
await subscribe_to_wallet()
def run_flask():
# Run Flask app without the reloader, so we can run the async main function
app.run(debug=False, port=3001, use_reloader=False)
if __name__ == '__main__':
# Start Flask in a separate thread
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()
# Create an event loop for the async tasks
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# Start Flask in a separate thread
flask_thread = threading.Thread(target=run_flask)
flask_thread.start()
# Run the async main function
asyncio.run(main())