gogo2/crypto/sol/app.py

205 lines
7.6 KiB
Python

from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Confirmed
from solders.pubkey import Pubkey
from dexscreener import DexscreenerClient
import asyncio
from telegram import Bot
from telegram.constants import ParseMode
import datetime
import logging
from solana.rpc.websocket_api import connect
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Confirmed
import websockets
import json
app = Flask(__name__)
# Use the production Solana RPC endpoint
solana_client = AsyncClient("https://api.mainnet-beta.solana.com")
dexscreener_client = DexscreenerClient()
# Configuration
DEVELOPER_CHAT_ID = "777826553"
FOLLOWED_WALLET = "9U7D916zuQ8qcL9kQZqkcroWhHGho5vD8VNekvztrutN" # traderrobot
YOUR_WALLET = "65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5"
TELEGRAM_BOT_TOKEN = "6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw"
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
# Token addresses (initialize with some known tokens)
TOKEN_ADDRESSES = {
"SOL": "So11111111111111111111111111111111111111112",
"USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og"
}
async def send_telegram_message(message):
try:
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
logging.info(f"Telegram message sent: {message}")
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")
async def get_token_balance(wallet_address, token_address):
try:
response = await solana_client.get_token_accounts_by_owner(
Pubkey.from_string(wallet_address),
{"mint": Pubkey.from_string(token_address)}
)
if response['result']['value']:
balance = await solana_client.get_token_account_balance(
response['result']['value'][0]['pubkey']
)
amount = float(balance['result']['value']['uiAmount'])
logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}")
return amount
else:
logging.debug(f"No account found for {token_address} in {wallet_address}")
return 0
except Exception as e:
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)}")
return 0
async def get_wallet_balances(wallet_address):
balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}")
for token, address in TOKEN_ADDRESSES.items():
balance = await get_token_balance(wallet_address, address)
balances[token] = balance
logging.debug(f"Balance for {token}: {balance}")
return balances
async def get_non_zero_token_balances(wallet_address):
non_zero_balances = {}
logging.info(f"Getting non-zero balances for wallet: {wallet_address}")
for token, address in TOKEN_ADDRESSES.items():
balance = await get_token_balance(wallet_address, address)
if balance > 0:
non_zero_balances[token] = address
logging.debug(f"Non-zero balance for {token}: {balance}")
return non_zero_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
followed_non_zero = await get_non_zero_token_balances(FOLLOWED_WALLET)
your_non_zero = await get_non_zero_token_balances(YOUR_WALLET)
TOKEN_ADDRESSES = {**followed_non_zero, **your_non_zero}
logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}")
followed_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in followed_wallet_balances.items() if amount > 0])
your_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in your_wallet_balances.items() if amount > 0])
message = (
"<b>Initial Wallet States (Non-zero balances):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{followed_wallet_state}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{your_wallet_state}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join(TOKEN_ADDRESSES.keys())}"
)
logging.info(message)
await send_telegram_message(message)
async def follow_move(move):
followed_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_balances = await get_wallet_balances(YOUR_WALLET)
if move['token'] not in followed_balances or move['token'] not in your_balances:
logging.error(f"Invalid token: {move['token']}")
return
followed_balance = followed_balances[move['token']]
your_balance = your_balances[move['token']]
proportion = your_balance / followed_balance if followed_balance > 0 else 0
amount_to_swap = move['amount'] * proportion
if your_balance >= amount_to_swap:
# Implement actual swap logic here
pair = dexscreener_client.get_token_pair("solana", move['token'])
price = float(pair['priceUsd'])
received_amount = amount_to_swap * price
message = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {move['token']} "
f"for {received_amount:.6f} {move['to_token']}"
)
logging.info(message)
await send_telegram_message(message)
else:
message = (
f"<b>Move Failed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}"
)
logging.warning(message)
await send_telegram_message(message)
async def on_logs(logs):
print(f"Received logs: {logs}")
if logs.value.err:
return
tx = logs.value.signature
tx_result = await solana_client.get_transaction(tx)
if tx_result and tx_result.value:
for instruction in tx_result.value.transaction.message.instructions:
if instruction.program_id == Pubkey.from_string(TOKEN_ADDRESSES['SOL']):
# This is a token transfer
from_pubkey = instruction.accounts[0]
to_pubkey = instruction.accounts[1]
amount = instruction.data[8:] # The amount is stored in the last 8 bytes
amount = int.from_bytes(amount, 'little') / 1e9 # Convert lamports to SOL
if from_pubkey == Pubkey.from_string(FOLLOWED_WALLET):
move = {
'token': 'SOL',
'amount': amount,
'to_token': 'Unknown' # You might want to determine this based on the receiving address
}
await follow_move(move)
async def subscribe_to_wallet():
uri = "wss://api.mainnet-beta.solana.com"
async with websockets.connect(uri) as websocket:
# Correct the `params` format to be an array
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{"mentions": ["YOUR_WALLET_ADDRESS"], "commitment": "confirmed"}
]
}
await websocket.send(json.dumps(request))
# Listen for messages
while True:
response = await websocket.recv()
print(response)
async def main():
logging.basicConfig(level=logging.INFO)
await send_telegram_message("Solana Agent Application Started")
await list_initial_wallet_states()
await subscribe_to_wallet()
if __name__ == '__main__':
asyncio.run(main())