diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 8848e46..737bc4b 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -24,8 +24,7 @@ from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient -from telegram import Bot -from telegram.constants import ParseMode + import datetime import logging from logging.handlers import RotatingFileHandler @@ -41,35 +40,52 @@ from typing import List, Dict, Any, Tuple import random +from modules.webui import init_app +from modules.storage import init_db, store_transaction + app = Flask(__name__) -# config = load_config() -load_dotenv() -load_dotenv('.env.secret') -# Configuration -DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") -FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") -YOUR_WALLET = os.getenv("YOUR_WALLET") -TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") -SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") -SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") -DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') -BOT_NAME = os.getenv("BOT_NAME") +from config import ( + FOLLOWED_WALLET, + YOUR_WALLET, + SOLANA_WS_URL, + SOLANA_HTTP_URL, + DISPLAY_CURRENCY, + logger, + error_logger +) -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) -#logging.basicConfig(level=logging.INFO) +from modules.utils import (send_telegram_message, get_token_prices, get_sol_price, get_wallet_balances, convert_balances_to_currency, get_swap_transaction_details) -# Set up error logger -log_dir = './logs' -log_file = os.path.join(log_dir, 'error.log') -os.makedirs(log_dir, exist_ok=True) -error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) -error_file_handler.setLevel(logging.ERROR) -error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) -error_logger = logging.getLogger('error_logger') -error_logger.setLevel(logging.ERROR) -error_logger.addHandler(error_file_handler) +from modules.SolanaAPI import SolanaAPI, solana_jsonrpc + +# # config = load_config() +# load_dotenv() +# load_dotenv('.env.secret') +# # Configuration +# DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") +# FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") +# YOUR_WALLET = os.getenv("YOUR_WALLET") +# TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") +# SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") +# SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") +# DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') +# BOT_NAME = os.getenv("BOT_NAME") + +# logger = logging.getLogger(__name__) +# logging.basicConfig(level=logging.DEBUG) +# #logging.basicConfig(level=logging.INFO) + +# # Set up error logger +# log_dir = './logs' +# log_file = os.path.join(log_dir, 'error.log') +# os.makedirs(log_dir, exist_ok=True) +# error_file_handler = RotatingFileHandler( log_file, maxBytes=10*1024*1024, backupCount=5) +# error_file_handler.setLevel(logging.ERROR) +# error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S') ) +# error_logger = logging.getLogger('error_logger') +# error_logger.setLevel(logging.ERROR) +# error_logger.addHandler(error_file_handler) # Function to find the latest log file @@ -114,12 +130,6 @@ async def retry_last_log(): # Create the bot with the custom connection pool bot = None -# Token addresses (initialize with some known tokens) -TOKEN_ADDRESSES = { - "SOL": "So11111111111111111111111111111111111111112", - "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", - "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", -} TOKENS_INFO = {} try: @@ -129,13 +139,13 @@ except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # -async def send_telegram_message(message): - try: - await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) - logging.info(f"Telegram message sent: {message}") - # logging.info(f"Telegram message dummy sent: {message}") - except Exception as e: - logging.error(f"Error sending Telegram message: {str(e)}") +# async def send_telegram_message(message): +# try: +# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) +# logging.info(f"Telegram message sent: {message}") +# # logging.info(f"Telegram message dummy sent: {message}") +# except Exception as e: +# logging.error(f"Error sending Telegram message: {str(e)}") @@ -437,6 +447,8 @@ async def get_token_metadata_symbol(mint_address): return None + + METADATA_STRUCT = CStruct( "update_authority" / String, "mint" / String, @@ -793,49 +805,6 @@ async def get_transaction_details_rpc(tx_signature, readfromDump=False): print("Error fetching transaction details:", e) -async def solana_jsonrpc(method, params = None, jsonParsed = True): - # target json example: - # data = { - # "jsonrpc": "2.0", - # "id": 1, - # "method": "getTransaction", - # "params": [ - # tx_signature, - # { - # "encoding": "jsonParsed", - # "maxSupportedTransactionVersion": 0 - # } - # ] - # } - # if param is not array, make it array - if not isinstance(params, list): - params = [params] - - data = { - "jsonrpc": "2.0", - "id": 1, - "method": method, - "params": params or [] - } - data["params"].append({"maxSupportedTransactionVersion": 0}) - if jsonParsed: - data["params"][1]["encoding"] = "jsonParsed" - - - try: - # url = 'https://solana.drpc.org' - response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - result = response.json() - if not 'result' in result or 'error' in result: - print("Error fetching data from Solana RPC:", result) - return None - return result['result'] - except Exception as e: - logging.error(f"Error fetching data from Solana RPC: {e}") - return None - - # # # # # # # # # # Functionality # # # # # # # # # # @@ -1429,12 +1398,7 @@ async def check_PK(): async def main(): global bot, PROCESSING_LOG - # Initialize Telegram Bot - # Create a custom connection pool - conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit - timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout - bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) await send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() @@ -1464,8 +1428,6 @@ async def main(): await send_telegram_message("Restarting wallet_watch_loop") -from modules.webui import init_app -from modules.storage import init_db async def run_flask(): # loop = asyncio.get_running_loop() diff --git a/crypto/sol/config.py b/crypto/sol/config.py new file mode 100644 index 0000000..e708df8 --- /dev/null +++ b/crypto/sol/config.py @@ -0,0 +1,59 @@ +# config.py + +import os +import logging +from dotenv import load_dotenv +from logging.handlers import RotatingFileHandler + +# Load environment variables +load_dotenv() +load_dotenv('.env.secret') + +# Configuration +DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") +FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") +YOUR_WALLET = os.getenv("YOUR_WALLET") +TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") +SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") +SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") +DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') +BOT_NAME = os.getenv("BOT_NAME") + +# Token addresses (initialize with some known tokens) +TOKEN_ADDRESSES = { + "SOL": "So11111111111111111111111111111111111111112", + "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", + "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", +} + +# Logging configuration +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Set up error logger +log_dir = './logs' +log_file = os.path.join(log_dir, 'error.log') +os.makedirs(log_dir, exist_ok=True) +error_file_handler = RotatingFileHandler( + log_file, + maxBytes=10*1024*1024, + backupCount=5 +) +error_file_handler.setLevel(logging.ERROR) +error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) +error_logger = logging.getLogger('error_logger') +error_logger.setLevel(logging.ERROR) +error_logger.addHandler(error_file_handler) + +# Function to get all configuration +def get_config(): + return { + "DEVELOPER_CHAT_ID": DEVELOPER_CHAT_ID, + "FOLLOWED_WALLET": FOLLOWED_WALLET, + "YOUR_WALLET": YOUR_WALLET, + "TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN, + "SOLANA_WS_URL": SOLANA_WS_URL, + "SOLANA_HTTP_URL": SOLANA_HTTP_URL, + "DISPLAY_CURRENCY": DISPLAY_CURRENCY, + "BOT_NAME": BOT_NAME, + } \ No newline at end of file diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py new file mode 100644 index 0000000..ee4793b --- /dev/null +++ b/crypto/sol/modules/SolanaAPI.py @@ -0,0 +1,239 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import asyncio +import json +import logging +import random +import websockets +from typing import Optional +import requests +import datetime + +logger = logging.getLogger(__name__) + +SOLANA_ENDPOINTS = ["your_endpoint_1", "your_endpoint_2"] # Add your endpoints here +SUBSCRIBE_INTERVAL = 300 # 5 minutes in seconds + +from config import ( +FOLLOWED_WALLET, SOLANA_HTTP_URL +) + + +class SolanaAPI: + def __init__(self): + self.websocket: Optional[websockets.WebSocketClientProtocol] = None + self.subscription_id: Optional[int] = None + self.message_queue: asyncio.Queue = asyncio.Queue() + + async def connect(self): + while True: + try: + current_url = random.choice(SOLANA_ENDPOINTS) + self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=20) + logger.info(f"Connected to Solana websocket: {current_url}") + return + except Exception as e: + logger.error(f"Failed to connect to {current_url}: {e}") + await asyncio.sleep(5) + + async def subscribe(self): + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + {"mentions": [FOLLOWED_WALLET]}, + {"commitment": "confirmed"} + ] + } + await self.websocket.send(json.dumps(request)) + response = await self.websocket.recv() + response_data = json.loads(response) + + if 'result' in response_data: + self.subscription_id = response_data['result'] + logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + else: + logger.warning(f"Unexpected response: {response_data}") + + async def unsubscribe(self): + if self.subscription_id: + request = { + "jsonrpc": "2.0", + "id": 1, + "method": "logsUnsubscribe", + "params": [self.subscription_id] + } + await self.websocket.send(json.dumps(request)) + logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") + self.subscription_id = None + + async def receive_messages(self): + while True: + try: + message = await self.websocket.recv() + await self.message_queue.put(message) + except websockets.exceptions.ConnectionClosedError: + logger.error("WebSocket connection closed") + break + except Exception as e: + logger.error(f"Error receiving message: {e}") + break + + async def process_messages(self): + while True: + message = await self.message_queue.get() + try: + response_data = json.loads(message) + if 'params' in response_data: + log = response_data['params']['result'] + await process_log(log) + else: + logger.warning(f"Unexpected response: {response_data}") + except json.JSONDecodeError as e: + logger.error(f"Failed to decode JSON: {e}") + except Exception as e: + logger.error(f"An unexpected error occurred while processing message: {e}") + finally: + self.message_queue.task_done() + + +async def solana_jsonrpc(method, params = None, jsonParsed = True): + # target json example: + # data = { + # "jsonrpc": "2.0", + # "id": 1, + # "method": "getTransaction", + # "params": [ + # tx_signature, + # { + # "encoding": "jsonParsed", + # "maxSupportedTransactionVersion": 0 + # } + # ] + # } + # if param is not array, make it array + if not isinstance(params, list): + params = [params] + + data = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params or [] + } + data["params"].append({"maxSupportedTransactionVersion": 0}) + if jsonParsed: + data["params"][1]["encoding"] = "jsonParsed" + + + try: + # url = 'https://solana.drpc.org' + response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) + response.raise_for_status() # Raises an error for bad responses + result = response.json() + if not 'result' in result or 'error' in result: + print("Error fetching data from Solana RPC:", result) + return None + return result['result'] + except Exception as e: + logging.error(f"Error fetching data from Solana RPC: {e}") + return None + + +async def process_log(log): + # Implement your log processing logic here + pass + +async def send_telegram_message(message): + # Implement your Telegram message sending logic here + pass + +async def list_initial_wallet_states(): + # Implement your initial wallet state listing logic here + pass + +async def wallet_watch_loop(): + solana_ws = SolanaAPI() + first_subscription = True + + while True: + try: + await solana_ws.connect() + await solana_ws.subscribe() + + if first_subscription: + asyncio.create_task(list_initial_wallet_states()) + first_subscription = False + + await send_telegram_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + + receive_task = asyncio.create_task(solana_ws.receive_messages()) + process_task = asyncio.create_task(solana_ws.process_messages()) + + try: + await asyncio.gather(receive_task, process_task) + except asyncio.CancelledError: + pass + finally: + receive_task.cancel() + process_task.cancel() + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + finally: + await solana_ws.unsubscribe() + if solana_ws.websocket: + await solana_ws.websocket.close() + await send_telegram_message("Reconnecting...") + await asyncio.sleep(5) + +# Example usage +# async def main(): +# account_address = "Vote111111111111111111111111111111111111111" + +async def get_last_transactions(account_address, check_interval=300, limit=1000): + last_check_time = None + last_signature = None + + while True: + current_time = datetime.now() + + if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: + params = [ + account_address, + { + "limit": limit + } + ] + + if last_signature: + params[1]["before"] = last_signature + + result = await solana_jsonrpc("getSignaturesForAddress", params) + + if result: + for signature in result: + if last_signature and signature['signature'] == last_signature: + break + + # Process the transaction + await process_transaction(signature) + + if result: + last_signature = result[0]['signature'] + + last_check_time = current_time + + await asyncio.sleep(1) # Sleep for 1 second before checking again + +async def process_transaction(signature): + # Implement your logic to process each transaction + print(f"Processing transaction: {signature['signature']}") + # You can add more processing logic here, such as storing in a database, + # triggering notifications, etc. + +if __name__ == "__main__": + asyncio.run(wallet_watch_loop()) \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py new file mode 100644 index 0000000..684e99d --- /dev/null +++ b/crypto/sol/modules/utils.py @@ -0,0 +1,27 @@ +# telegram_utils.py +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import aiohttp +import logging +from telegram import Bot +from telegram.constants import ParseMode +from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME + +# Initialize Telegram Bot +# Create a custom connection pool +conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit +timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout + + # bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) +bot = Bot(token=TELEGRAM_BOT_TOKEN) + +async def send_telegram_message(message): + try: + await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + logging.info(f"Telegram message sent: {message}") + except Exception as e: + logging.error(f"Error sending Telegram message: {str(e)}") + +# You can add more Telegram-related functions here if needed \ No newline at end of file