268 lines
9.2 KiB
Python
268 lines
9.2 KiB
Python
import sys
|
|
import os
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import random
|
|
import websockets
|
|
from typing import Optional
|
|
import requests
|
|
import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SOLANA_ENDPOINTS = [
|
|
"wss://api.mainnet-beta.solana.com",
|
|
# "wss://solana-api.projectserum.com",
|
|
# "wss://rpc.ankr.com/solana",
|
|
# "wss://mainnet.rpcpool.com",
|
|
]
|
|
PING_INTERVAL = 30
|
|
SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 10 minutes
|
|
|
|
from config import (
|
|
FOLLOWED_WALLET, SOLANA_HTTP_URL
|
|
)
|
|
|
|
from modules.utils import telegram_utils
|
|
|
|
class SolanaWS:
|
|
def __init__(self, on_message: Optional[callable] = None):
|
|
self.websocket = None
|
|
self.subscription_id = None
|
|
self.message_queue = asyncio.Queue()
|
|
self.on_message = on_message
|
|
|
|
async def connect(self):
|
|
while True:
|
|
try:
|
|
current_url = random.choice(SOLANA_ENDPOINTS)
|
|
self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=20)
|
|
logger.info(f"Connected to Solana websocket: {current_url}")
|
|
return
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to {current_url}: {e}")
|
|
await asyncio.sleep(5)
|
|
|
|
async def ws_jsonrpc(self, method, params=None):
|
|
if not isinstance(params, list):
|
|
params = [params] if params is not None else []
|
|
|
|
request = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": method,
|
|
"params": params
|
|
}
|
|
|
|
await self.websocket.send(json.dumps(request))
|
|
response = await self.websocket.recv()
|
|
response_data = json.loads(response)
|
|
|
|
if 'result' in response_data:
|
|
return response_data['result']
|
|
elif 'error' in response_data:
|
|
logger.error(f"Error in WebSocket RPC call: {response_data['error']}")
|
|
return None
|
|
else:
|
|
logger.warning(f"Unexpected response: {response_data}")
|
|
return None
|
|
|
|
async def subscribe(self):
|
|
params = [
|
|
{"mentions": [FOLLOWED_WALLET]},
|
|
{"commitment": "confirmed"}
|
|
]
|
|
result = await self.ws_jsonrpc("logsSubscribe", params)
|
|
if result is not None:
|
|
self.subscription_id = result
|
|
logger.info(f"Subscription successful. Subscription id: {self.subscription_id}")
|
|
else:
|
|
logger.error("Failed to subscribe")
|
|
|
|
async def unsubscribe(self):
|
|
if self.subscription_id:
|
|
result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id])
|
|
if result:
|
|
logger.info(f"Unsubscribed from subscription id: {self.subscription_id}")
|
|
self.subscription_id = None
|
|
else:
|
|
logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}")
|
|
|
|
async def receive_messages(self):
|
|
while True:
|
|
try:
|
|
message = await self.websocket.recv()
|
|
await self.message_queue.put(message)
|
|
except websockets.exceptions.ConnectionClosedError:
|
|
logger.error("WebSocket connection closed")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error receiving message: {e}")
|
|
break
|
|
|
|
async def process_messages(self):
|
|
while True:
|
|
message = await self.message_queue.get()
|
|
await self.on_message(message)
|
|
logger.info(f"Received message: {message}")
|
|
|
|
async def close(self):
|
|
if self.websocket:
|
|
await self.websocket.close()
|
|
logger.info("WebSocket connection closed")
|
|
while True:
|
|
message = await self.message_queue.get()
|
|
try:
|
|
response_data = json.loads(message)
|
|
if 'params' in response_data:
|
|
log = response_data['params']['result']
|
|
await process_log(log)
|
|
else:
|
|
logger.warning(f"Unexpected response: {response_data}")
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to decode JSON: {e}")
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred while processing message: {e}")
|
|
finally:
|
|
self.message_queue.task_done()
|
|
|
|
|
|
async def solana_jsonrpc(method, params = None, jsonParsed = True):
|
|
# target json example:
|
|
# data = {
|
|
# "jsonrpc": "2.0",
|
|
# "id": 1,
|
|
# "method": "getTransaction",
|
|
# "params": [
|
|
# tx_signature,
|
|
# {
|
|
# "encoding": "jsonParsed",
|
|
# "maxSupportedTransactionVersion": 0
|
|
# }
|
|
# ]
|
|
# }
|
|
# if param is not array, make it array
|
|
if not isinstance(params, list):
|
|
params = [params]
|
|
|
|
data = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": method,
|
|
"params": params or []
|
|
}
|
|
data["params"].append({"maxSupportedTransactionVersion": 0})
|
|
if jsonParsed:
|
|
data["params"][1]["encoding"] = "jsonParsed"
|
|
|
|
|
|
try:
|
|
# url = 'https://solana.drpc.org'
|
|
response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data))
|
|
response.raise_for_status() # Raises an error for bad responses
|
|
result = response.json()
|
|
if not 'result' in result or 'error' in result:
|
|
print("Error fetching data from Solana RPC:", result)
|
|
return None
|
|
return result['result']
|
|
except Exception as e:
|
|
logging.error(f"Error fetching data from Solana RPC: {e}")
|
|
return None
|
|
|
|
class SolanaAPI:
|
|
|
|
def __init__(self, process_log_callback, send_telegram_message_callback, list_initial_wallet_states_callback):
|
|
self.process_log = process_log_callback
|
|
self.list_initial_wallet_states = list_initial_wallet_states_callback
|
|
|
|
async def process_messages(self, solana_ws):
|
|
while True:
|
|
message = await solana_ws.message_queue.get()
|
|
await self.process_log(message)
|
|
|
|
async def wallet_watch_loop():
|
|
solana_ws = SolanaWS(on_message=process_log)
|
|
first_subscription = True
|
|
|
|
while True:
|
|
try:
|
|
await solana_ws.connect()
|
|
await solana_ws.subscribe()
|
|
|
|
if first_subscription:
|
|
asyncio.create_task(self.list_initial_wallet_states())
|
|
first_subscription = False
|
|
|
|
await telegram_utils.send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...")
|
|
|
|
receive_task = asyncio.create_task(solana_ws.receive_messages())
|
|
process_task = asyncio.create_task(solana_ws.process_messages())
|
|
|
|
try:
|
|
await asyncio.gather(receive_task, process_task)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
receive_task.cancel()
|
|
process_task.cancel()
|
|
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred: {e}")
|
|
finally:
|
|
await solana_ws.unsubscribe()
|
|
if solana_ws.websocket:
|
|
await solana_ws.websocket.close()
|
|
await telegram_utils.send_telegram_message("Reconnecting...")
|
|
await asyncio.sleep(5)
|
|
|
|
async def process_transaction(signature):
|
|
# Implement your logic to process each transaction
|
|
print(f"Processing transaction: {signature['signature']}")
|
|
# You can add more processing logic here, such as storing in a database,
|
|
# triggering notifications, etc.
|
|
# Example usage
|
|
# async def main():
|
|
# account_address = "Vote111111111111111111111111111111111111111"
|
|
|
|
async def get_last_transactions(account_address, check_interval=300, limit=1000):
|
|
last_check_time = None
|
|
last_signature = None
|
|
|
|
while True:
|
|
current_time = datetime.now()
|
|
|
|
if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval:
|
|
params = [
|
|
account_address,
|
|
{
|
|
"limit": limit
|
|
}
|
|
]
|
|
|
|
if last_signature:
|
|
params[1]["before"] = last_signature
|
|
|
|
result = await solana_jsonrpc("getSignaturesForAddress", params)
|
|
|
|
if result:
|
|
for signature in result:
|
|
if last_signature and signature['signature'] == last_signature:
|
|
break
|
|
|
|
# Process the transaction
|
|
await process_transaction(signature)
|
|
|
|
if result:
|
|
last_signature = result[0]['signature']
|
|
|
|
last_check_time = current_time
|
|
|
|
await asyncio.sleep(1) # Sleep for 1 second before checking again
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(wallet_watch_loop()) |