gogo2/crypto/sol/app.py
2024-10-02 16:23:17 +03:00

207 lines
7.6 KiB
Python

import asyncio
import websockets
import json
from flask import Flask, render_template, request, jsonify
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Confirmed
from solders.pubkey import Pubkey
from dexscreener import DexscreenerClient
from telegram import Bot
from telegram.constants import ParseMode
import datetime
import logging
from solana.rpc.websocket_api import connect
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Confirmed
import os
from dotenv import load_dotenv
load_dotenv()
app = Flask(__name__)
# Use the production Solana RPC endpoint
solana_client = AsyncClient("https://api.mainnet-beta.solana.com")
dexscreener_client = DexscreenerClient()
# Configuration
DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID")
FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET")
YOUR_WALLET = os.getenv("YOUR_WALLET")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
# Initialize Telegram Bot
bot = Bot(token=TELEGRAM_BOT_TOKEN)
# Initialize logging
logging.basicConfig(level=logging.DEBUG)
# Token addresses (initialize with some known tokens)
TOKEN_ADDRESSES = {
"SOL": "So11111111111111111111111111111111111111112",
"USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",
"TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og"
}
async def send_telegram_message(message):
try:
await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
logging.info(f"Telegram message sent: {message}")
except Exception as e:
logging.error(f"Error sending Telegram message: {str(e)}")
async def get_token_balance(wallet_address, token_address):
try:
response = await solana_client.get_token_accounts_by_owner(
Pubkey.from_string(wallet_address),
{"mint": Pubkey.from_string(token_address)}
)
if response['result']['value']:
balance = await solana_client.get_token_account_balance(
response['result']['value'][0]['pubkey']
)
amount = float(balance['result']['value']['uiAmount'])
logging.debug(f"Balance for {token_address} in {wallet_address}: {amount}")
return amount
else:
logging.debug(f"No account found for {token_address} in {wallet_address}")
return 0
except Exception as e:
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)}")
return 0
async def get_wallet_balances(wallet_address):
balances = {}
logging.info(f"Getting balances for wallet: {wallet_address}")
for token, address in TOKEN_ADDRESSES.items():
balance = await get_token_balance(wallet_address, address)
balances[token] = balance
logging.debug(f"Balance for {token}: {balance}")
return balances
async def get_non_zero_token_balances(wallet_address):
non_zero_balances = {}
logging.info(f"Getting non-zero balances for wallet: {wallet_address}")
for token, address in TOKEN_ADDRESSES.items():
balance = await get_token_balance(wallet_address, address)
if balance > 0:
non_zero_balances[token] = address
logging.debug(f"Non-zero balance for {token}: {balance}")
return non_zero_balances
async def list_initial_wallet_states():
global TOKEN_ADDRESSES
followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_wallet_balances = await get_wallet_balances(YOUR_WALLET)
followed_non_zero = await get_non_zero_token_balances(FOLLOWED_WALLET)
your_non_zero = await get_non_zero_token_balances(YOUR_WALLET)
TOKEN_ADDRESSES = {**followed_non_zero, **your_non_zero}
logging.info(f"Monitoring balances for tokens: {TOKEN_ADDRESSES.keys()}")
followed_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in followed_wallet_balances.items() if amount > 0])
your_wallet_state = "\n".join([f"{token}: {amount:.6f}" for token, amount in your_wallet_balances.items() if amount > 0])
message = (
"<b>Initial Wallet States (Non-zero balances):</b>\n\n"
f"<b>Followed Wallet ({FOLLOWED_WALLET}):</b>\n"
f"{followed_wallet_state}\n\n"
f"<b>Your Wallet ({YOUR_WALLET}):</b>\n"
f"{your_wallet_state}\n\n"
f"<b>Monitored Tokens:</b>\n"
f"{', '.join(TOKEN_ADDRESSES.keys())}"
)
logging.info(message)
await send_telegram_message(message)
async def follow_move(move):
followed_balances = await get_wallet_balances(FOLLOWED_WALLET)
your_balances = await get_wallet_balances(YOUR_WALLET)
if move['token'] not in followed_balances or move['token'] not in your_balances:
logging.error(f"Invalid token: {move['token']}")
return
followed_balance = followed_balances[move['token']]
your_balance = your_balances[move['token']]
proportion = your_balance / followed_balance if followed_balance > 0 else 0
amount_to_swap = move['amount'] * proportion
if your_balance >= amount_to_swap:
# Implement actual swap logic here
pair = dexscreener_client.get_token_pair("solana", move['token'])
price = float(pair['priceUsd'])
received_amount = amount_to_swap * price
message = (
f"<b>Move Followed:</b>\n"
f"Swapped {amount_to_swap:.6f} {move['token']} "
f"for {received_amount:.6f} {move['to_token']}"
)
logging.info(message)
await send_telegram_message(message)
else:
message = (
f"<b>Move Failed:</b>\n"
f"Insufficient balance to swap {amount_to_swap:.6f} {move['token']}"
)
logging.warning(message)
await send_telegram_message(message)
async def on_logs(logs):
print(f"Received logs: {logs}")
if logs.value.err:
return
tx = logs.value.signature
tx_result = await solana_client.get_transaction(tx)
if tx_result and tx_result.value:
for instruction in tx_result.value.transaction.message.instructions:
if instruction.program_id == Pubkey.from_string(TOKEN_ADDRESSES['SOL']):
# This is a token transfer
from_pubkey = instruction.accounts[0]
to_pubkey = instruction.accounts[1]
amount = instruction.data[8:] # The amount is stored in the last 8 bytes
amount = int.from_bytes(amount, 'little') / 1e9 # Convert lamports to SOL
if from_pubkey == Pubkey.from_string(FOLLOWED_WALLET):
move = {
'token': 'SOL',
'amount': amount,
'to_token': 'Unknown' # You might want to determine this based on the receiving address
}
await follow_move(move)
async def subscribe_to_wallet():
uri = "wss://api.mainnet-beta.solana.com"
async with websockets.connect(uri) as websocket:
# Correct the `params` format to be an array
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{"mentions": [YOUR_WALLET], "commitment": "confirmed"}
]
}
await websocket.send(json.dumps(request))
# Listen for messages
while True:
response = await websocket.recv()
print(response)
async def main():
logging.basicConfig(level=logging.INFO)
await send_telegram_message("Solana Agent Application Started")
await list_initial_wallet_states()
await subscribe_to_wallet()
if __name__ == '__main__':
asyncio.run(main())