gogo2/crypto/sol/app.py

227 lines
7.9 KiB
Python

from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Confirmed
from solders.pubkey import Pubkey
from dexscreener import DexscreenerClient
import asyncio
from telegram import Bot
from telegram.constants import ParseMode
import os
import datetime
app = Flask(__name__)
solana_client = AsyncClient("https://api.mainnet-beta.solana.com")
dexscreener_client = DexscreenerClient()
# This can be your own ID, or one for a developer group/channel.
# You can use the /start command of this bot to see your chat id.
DEVELOPER_CHAT_ID = "777826553"
# Replace with the wallet address you want to follow
FOLLOWED_WALLET = "9U7D916zuQ8qcL9kQZqkcroWhHGho5vD8VNekvztrutN"
# Replace with your wallet address
YOUR_WALLET = "65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5"
# Channel DBot:
# TELEGRAM_CHAT_ID = "777826553"
# Telegram Bot Token
# t.me/kevin_ai_robot
# TOKEN = '6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw'
# t.me/artitherobot 6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0
# Replace with your Telegram Bot Token
TELEGRAM_BOT_TOKEN = "6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw"
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
t send_telegram_message(message)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/tokens', methods=['GET'])
async def get_tokens():
balances = await get_wallet_balances(YOUR_WALLET)
return jsonify(list(balances.keys()))
@app.route('/balances', methods=['GET'])
async def get_balances():
balances = await get_wallet_balances(YOUR_WALLET)
return jsonify(balances)
@app.route('/follow_move', methods=['POST'])
async def follow_move():
move = request.json
followed_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_balances = await get_wallet_balances(YOUR_WALLET)
if move['token'] not in followed_balances or move['token'] not in your_balances:
return jsonify({'status': 'error', 'message': 'Invalid token'})
followed_balance = followed_balances[move['token']]
your_balance = your_balances[move['token']]
proportion = your_balance / followed_balance if followed_balance > 0 else 0
amount_to_swap = move['amount'] * proportion
if your_balance >= amount_to_swap:
# Here you would implement the actual swap logic
# For now, we'll just simulate the swap
pair = dexscreener_client.get_token_pair("solana", move['token'])
price = float(pair['priceUsd'])
received_amount = amount_to_swap * price
await send_telegram_message(
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {move['token']} "
f"for {received_amount:.6f} {move['to_token']}"
)
return jsonify({
'status': 'success',
'message': f"Swapped {amount_to_swap:.6f} {move['token']} for {received_amount:.6f} {move['to_token']}"
})
else:
await send_telegram_message(
f"<b>Move Failed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}"
)
return jsonify({
'status': 'error',
'message': f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}"
})
@app.route('/followed_wallet_moves', methods=['GET'])
def get_followed_wallet_moves():
# In a real-world scenario, you'd use a blockchain explorer API to get recent transactions
# For this example, we'll simulate a move
simulated_move = {
"token": "SOL",
"action": "swap",
"amount": 5,
"to_token": "USDC"
}
# Send Telegram notification about the detected move
asyncio.run(send_telegram_message(
f"<b>Move Detected:</b>\n"
f"Followed wallet swapped {simulated_move['amount']} {simulated_move['token']} "
f"to {simulated_move['to_token']}"
))
return jsonify(simulated_move)
@app.route('/follow_move', methods=['POST'])
def follow_move():
move = request.json
followed_balance = wallet_balances[FOLLOWED_WALLET][move['token']]
your_balance = wallet_balances[YOUR_WALLET][move['token']]
proportion = your_balance / followed_balance
amount_to_swap = move['amount'] * proportion
if wallet_balances[YOUR_WALLET][move['token']] >= amount_to_swap:
wallet_balances[YOUR_WALLET][move['token']] -= amount_to_swap
pair = dexscreener_client.get_token_pair("solana", move['token'])
price = float(pair['priceUsd'])
received_amount = amount_to_swap * price
wallet_balances[YOUR_WALLET][move['to_token']] += received_amount
# Send Telegram notification about the followed move
asyncio.run(send_telegram_message(
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.4f} {move['token']} "
f"for {received_amount:.4f} {move['to_token']}"
))
return jsonify({
'status': 'success',
'message': f"Swapped {amount_to_swap:.4f} {move['token']} for {received_amount:.4f} {move['to_token']}"
})
else:
# Send Telegram notification about the failed move
asyncio.run(send_telegram_message(
f"<b>Move Failed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.4f} {move['token']}"
))
return jsonify({
'status': 'error',
'message': f"Insufficient balance to swap {amount_to_swap:.4f} {move['token']}"
})
async def get_wallet_balances(wallet_address):
balances = {}
for token, address in TOKEN_ADDRESSES.items():
balances[token] = await get_token_balance(wallet_address, address)
return balances
async def send_telegram_message(message):
return
# await bot.send_message(chat_id=TELEGRAM_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
async def get_non_zero_token_balances(wallet_address):
non_zero_balances = {}
for token, address in TOKEN_ADDRESSES.items():
balance = await get_token_balance(wallet_address, address)
if balance > 0:
non_zero_balances[token] = address
return non_zero_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
followed_non_zero = await get_non_zero_token_balances(FOLLOWED_WALLET)
your_non_zero = await get_non_zero_token_balances(YOUR_WALLET)
# Update TOKEN_ADDRESSES with non-zero balances from both wallets
TOKEN_ADDRESSES = {**followed_non_zero, **your_non_zero}
followed_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in followed_wallet_balances.items() if amount > 0])
your_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in your_wallet_balances.items() if amount > 0])
message = (
"<b>Initial Wallet States (Non-zero balances):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{followed_wallet_state}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{your_wallet_state}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join(TOKEN_ADDRESSES.keys())}"
)
await send_telegram_message(message)
async def send_startup_message():
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = (
f"<b>Solana Agent Application Started</b>\n\n"
f"Startup Time: {current_time}\n"
f"Followed Wallet: {FOLLOWED_WALLET}\n"
f"Your Wallet: {YOUR_WALLET}\n\n"
"The application is now running and ready to monitor wallet activities."
)
await send_telegram_message(message)
if __name__ == '__main__':
asyncio.run(send_startup_message())
asyncio.run(list_initial_wallet_states())
print(f"Monitored Tokens: {', '.join(TOKEN_ADDRESSES.keys())}")
app.run(debug=True, port=3009)