gogo2/crypto/sol/modules/SolanaAPI.py
2024-10-13 23:23:33 +03:00

239 lines
7.8 KiB
Python

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
import json
import logging
import random
import websockets
from typing import Optional
import requests
import datetime
logger = logging.getLogger(__name__)
SOLANA_ENDPOINTS = ["your_endpoint_1", "your_endpoint_2"] # Add your endpoints here
SUBSCRIBE_INTERVAL = 300 # 5 minutes in seconds
from config import (
FOLLOWED_WALLET, SOLANA_HTTP_URL
)
class SolanaAPI:
def __init__(self):
self.websocket: Optional[websockets.WebSocketClientProtocol] = None
self.subscription_id: Optional[int] = None
self.message_queue: asyncio.Queue = asyncio.Queue()
async def connect(self):
while True:
try:
current_url = random.choice(SOLANA_ENDPOINTS)
self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=20)
logger.info(f"Connected to Solana websocket: {current_url}")
return
except Exception as e:
logger.error(f"Failed to connect to {current_url}: {e}")
await asyncio.sleep(5)
async def subscribe(self):
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{"mentions": [FOLLOWED_WALLET]},
{"commitment": "confirmed"}
]
}
await self.websocket.send(json.dumps(request))
response = await self.websocket.recv()
response_data = json.loads(response)
if 'result' in response_data:
self.subscription_id = response_data['result']
logger.info(f"Subscription successful. Subscription id: {self.subscription_id}")
else:
logger.warning(f"Unexpected response: {response_data}")
async def unsubscribe(self):
if self.subscription_id:
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "logsUnsubscribe",
"params": [self.subscription_id]
}
await self.websocket.send(json.dumps(request))
logger.info(f"Unsubscribed from subscription id: {self.subscription_id}")
self.subscription_id = None
async def receive_messages(self):
while True:
try:
message = await self.websocket.recv()
await self.message_queue.put(message)
except websockets.exceptions.ConnectionClosedError:
logger.error("WebSocket connection closed")
break
except Exception as e:
logger.error(f"Error receiving message: {e}")
break
async def process_messages(self):
while True:
message = await self.message_queue.get()
try:
response_data = json.loads(message)
if 'params' in response_data:
log = response_data['params']['result']
await process_log(log)
else:
logger.warning(f"Unexpected response: {response_data}")
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON: {e}")
except Exception as e:
logger.error(f"An unexpected error occurred while processing message: {e}")
finally:
self.message_queue.task_done()
async def solana_jsonrpc(method, params = None, jsonParsed = True):
# target json example:
# data = {
# "jsonrpc": "2.0",
# "id": 1,
# "method": "getTransaction",
# "params": [
# tx_signature,
# {
# "encoding": "jsonParsed",
# "maxSupportedTransactionVersion": 0
# }
# ]
# }
# if param is not array, make it array
if not isinstance(params, list):
params = [params]
data = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params or []
}
data["params"].append({"maxSupportedTransactionVersion": 0})
if jsonParsed:
data["params"][1]["encoding"] = "jsonParsed"
try:
# url = 'https://solana.drpc.org'
response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
response.raise_for_status() # Raises an error for bad responses
result = response.json()
if not 'result' in result or 'error' in result:
print("Error fetching data from Solana RPC:", result)
return None
return result['result']
except Exception as e:
logging.error(f"Error fetching data from Solana RPC: {e}")
return None
async def process_log(log):
# Implement your log processing logic here
pass
async def send_telegram_message(message):
# Implement your Telegram message sending logic here
pass
async def list_initial_wallet_states():
# Implement your initial wallet state listing logic here
pass
async def wallet_watch_loop():
solana_ws = SolanaAPI()
first_subscription = True
while True:
try:
await solana_ws.connect()
await solana_ws.subscribe()
if first_subscription:
asyncio.create_task(list_initial_wallet_states())
first_subscription = False
await send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
receive_task = asyncio.create_task(solana_ws.receive_messages())
process_task = asyncio.create_task(solana_ws.process_messages())
try:
await asyncio.gather(receive_task, process_task)
except asyncio.CancelledError:
pass
finally:
receive_task.cancel()
process_task.cancel()
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
finally:
await solana_ws.unsubscribe()
if solana_ws.websocket:
await solana_ws.websocket.close()
await send_telegram_message("Reconnecting...")
await asyncio.sleep(5)
# Example usage
# async def main():
# account_address = "Vote111111111111111111111111111111111111111"
async def get_last_transactions(account_address, check_interval=300, limit=1000):
last_check_time = None
last_signature = None
while True:
current_time = datetime.now()
if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval:
params = [
account_address,
{
"limit": limit
}
]
if last_signature:
params[1]["before"] = last_signature
result = await solana_jsonrpc("getSignaturesForAddress", params)
if result:
for signature in result:
if last_signature and signature['signature'] == last_signature:
break
# Process the transaction
await process_transaction(signature)
if result:
last_signature = result[0]['signature']
last_check_time = current_time
await asyncio.sleep(1) # Sleep for 1 second before checking again
async def process_transaction(signature):
# Implement your logic to process each transaction
print(f"Processing transaction: {signature['signature']}")
# You can add more processing logic here, such as storing in a database,
# triggering notifications, etc.
if __name__ == "__main__":
asyncio.run(wallet_watch_loop())