diff --git a/crypto/sol/app.py b/crypto/sol/app.py
index 3d6ecb6..2084876 100644
--- a/crypto/sol/app.py
+++ b/crypto/sol/app.py
@@ -1,180 +1,59 @@
import asyncio
-import datetime
-import json
import logging
-import os
-from typing import Dict, Any
-
import uvicorn
+from fastapi import FastAPI
+from fastapi.middleware.wsgi import WSGIMiddleware
+
from asgiref.wsgi import WsgiToAsgi
from dotenv import load_dotenv
-
+from modules.webui import init_app
+from modules.utils import telegram_utils
from config import DO_WATCH_WALLET
from modules.SolanaAPI import SAPI
-from modules.log_processor import watch_for_new_logs
-from modules.utils import telegram_utils
-from modules.webui import init_app, teardown_app
+from modules.log_processor import LogProcessor
-# Load environment variables
load_dotenv()
load_dotenv('.env.secret')
-
-# Configure logging
logger = logging.getLogger(__name__)
-class LogProcessor:
- @staticmethod
- def save_log(log: Dict[str, Any]) -> None:
- """Save log to JSON file with timestamp."""
- try:
- os.makedirs('./logs', exist_ok=True)
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
- filename = f"./logs/log_{timestamp}.json"
- with open(filename, 'w') as f:
- json.dump(log, f, indent=2)
- except Exception as e:
- logger.error(f"Error saving RPC log: {e}")
-
- @staticmethod
- def extract_transaction_details(logs: list) -> Dict[str, Any]:
- """Extract transaction details from logs."""
- tr_details = {
- "order_id": None,
- "token_in": None,
- "token_out": None,
- "amount_in": 0,
- "amount_out": 0,
- "amount_in_USD": 0,
- "amount_out_USD": 0,
- "percentage_swapped": 0
- }
-
- before_source_balance = 0
- source_token_change = 0
-
- for i, log_entry in enumerate(logs):
- if tr_details["order_id"] is None and "order_id" in log_entry:
- tr_details["order_id"] = log_entry.split(":")[-1].strip()
- tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
- tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
-
- if "source_token_change" in log_entry:
- parts = log_entry.split(", ")
- for part in parts:
- if "source_token_change" in part:
- tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6
- elif "destination_token_change" in part:
- tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6
-
- if "before_source_balance" in log_entry:
- before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6
- if "source_token_change" in log_entry:
- source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6
-
- if before_source_balance > 0 and source_token_change > 0:
- tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
- if tr_details["percentage_swapped"] > 100:
- tr_details["percentage_swapped"] /= 1000
-
- return tr_details
-
- @staticmethod
- async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]:
- """Process a single log entry."""
- if log_result['value']['err']:
- return
-
- logs = log_result['value']['logs']
- swap_operations = [
- 'Program log: Instruction: Swap',
- 'Program log: Instruction: Swap2',
- 'Program log: Instruction: SwapExactAmountIn',
- 'Program log: Instruction: SwapV2'
- ]
-
- try:
- if not any(op in logs for op in swap_operations):
- return
-
- LogProcessor.save_log(log_result)
- tx_signature = log_result['value']['signature']
- tr_details = LogProcessor.extract_transaction_details(logs)
-
- if not all([tr_details["token_in"], tr_details["token_out"],
- tr_details["amount_in"], tr_details["amount_out"]]):
- tr_details = await SAPI.get_transaction_details_info(tx_signature, logs)
-
- # Update token information
- token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
- token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
-
- tr_details.update({
- "symbol_in": token_in.get('symbol'),
- "symbol_out": token_out.get('symbol'),
- "amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0),
- "amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0)
- })
-
- # Send notification
- message = (
- f"Swap detected: \n"
- f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} "
- f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}"
- )
- await telegram_utils.send_telegram_message(message)
-
- # Follow up actions
- await SAPI.follow_move(tr_details)
- await SAPI.save_token_info()
-
- except Exception as e:
- logger.error(f"Error processing log: {e}")
- await telegram_utils.send_telegram_message("Not followed! Error following move.")
-
- return tr_details
-
class Bot:
@staticmethod
async def initialize():
- """Initialize bot and start monitoring."""
await telegram_utils.initialize()
- await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
-
- asyncio.create_task(watch_for_new_logs())
+ await telegram_utils.send_telegram_message("Solana Agent Started")
+ asyncio.create_task(LogProcessor.watch_for_new_logs())
if DO_WATCH_WALLET:
asyncio.create_task(SAPI.wallet_watch_loop())
-async def start_server():
- """Run the ASGI server."""
- config = uvicorn.Config(
+app = init_app()
+asgi_app = WsgiToAsgi(app)
+
+async def main():
+ global bot, PROCESSING_LOG, pk
+
+ await telegram_utils.initialize()
+ await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...")
+ # process_transaction
+ await SAPI.wallet_watch_loop()
+
+def run_asyncio_tasks():
+ asyncio.run(main())
+
+if __name__ == '__main__':
+ import multiprocessing
+
+ # Start the asyncio tasks in a separate process
+ process = multiprocessing.Process(target=run_asyncio_tasks)
+ process.start()
+
+ # Run the ASGI server
+ uvicorn.run(
"app:asgi_app",
host="0.0.0.0",
port=3001,
- log_level="info",
+ log_level="debug",
reload=True
)
- server = uvicorn.Server(config)
- await server.serve()
-async def main():
- """Main application entry point."""
- # Initialize app and create ASGI wrapper
- app = await init_app()
- global asgi_app
- asgi_app = WsgiToAsgi(app)
-
- # Initialize bot
- await Bot.initialize()
-
- # Start server
- try:
- await start_server()
- except KeyboardInterrupt:
- logger.info("Shutting down...")
- await teardown_app()
-
-if __name__ == '__main__':
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- logger.info("Application terminated by user")
\ No newline at end of file
+ # Wait for the asyncio tasks to complete
+ process.join()
diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py
index cbce708..0be36c7 100644
--- a/crypto/sol/modules/SolanaAPI.py
+++ b/crypto/sol/modules/SolanaAPI.py
@@ -84,7 +84,6 @@ class SolanaWS:
self.subscription_id = None
self.message_queue = asyncio.Queue()
self.on_message = on_message
- self.websocket = None
self.last_msg_responded = False
async def save_log(log):
@@ -100,8 +99,6 @@ class SolanaWS:
async def connect(self):
while True:
- if self.websocket is None or self.websocket.closed:
- await self.connect()
try:
current_url = random.choice(SOLANA_ENDPOINTS)
self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=10)
@@ -178,8 +175,10 @@ class SolanaWS:
response_data = json.loads(response)
self.last_msg_responded = True
- if 'result' in response_data:
+ if 'params' in response_data and 'result' in response_data["params"]:
await self.message_queue.put(response_data['result'])
+ else:
+ logger.warning(f"Unexpected response: {response_data}")
if one:
break
except websockets.exceptions.ConnectionClosedError:
@@ -285,15 +284,16 @@ class SolanaAPI:
await solana_ws.connect()
await solana_ws.subscribe()
+ receive_task = asyncio.create_task(solana_ws.receive_messages())
+ process_task = asyncio.create_task(solana_ws.process_messages())
+
+
if first_subscription:
first_subscription = False
await async_safe_call(self.on_initial_subscription, solana_ws.subscription_id)
await async_safe_call(self.on_bot_message,f"Solana mainnet connected ({solana_ws.subscription_id})...")
- receive_task = asyncio.create_task(solana_ws.receive_messages())
- process_task = asyncio.create_task(solana_ws.process_messages())
-
try:
await asyncio.gather(receive_task, process_task)
except asyncio.CancelledError:
@@ -1211,7 +1211,7 @@ class SolanaDEX:
if mint in self.TOKENS_INFO:
token_name = self.TOKENS_INFO[mint].get('symbol')
elif doGetTokenName:
- token_name = await self.get_token_metadata_symbol(mint) or 'N/A'
+ token_name = await SAPI.get_token_metadata_symbol(mint) or 'N/A'
self.TOKENS_INFO[mint] = {'symbol': token_name}
await asyncio.sleep(2)
diff --git a/crypto/sol/modules/log_processor.py b/crypto/sol/modules/log_processor.py
index b23a90a..2f9362d 100644
--- a/crypto/sol/modules/log_processor.py
+++ b/crypto/sol/modules/log_processor.py
@@ -1,60 +1,182 @@
+from asyncio.log import logger
+import datetime
+import json
import os
import asyncio
from pathlib import Path
+from typing import Any, Dict
+
+from .utils import TelegramUtils
from .storage import Storage
-from .SolanaAPI import SolanaAPI
+from .SolanaAPI import SAPI, SolanaAPI
LOG_DIRECTORY = "./logs"
FILE_MASK = "wh_*.json"
-async def process_log_file(file_path):
- # Read the file and extract transaction data
- with open(file_path, 'r') as file:
- data = file.read()
- # Assume data is parsed into these variables
- wallet_id = "extracted_wallet_id"
- transaction_type = "extracted_transaction_type"
- sell_currency = "extracted_sell_currency"
- sell_amount = 0.0
- sell_value = 0.0
- buy_currency = "extracted_buy_currency"
- buy_amount = 0.0
- buy_value = 0.0
- solana_signature = "extracted_solana_signature"
- details = {}
- # Process the webhook data
- solana_api = SolanaAPI()
- transaction_data = await solana_api.process_wh(data)
- # Check if the transaction already exists
- existing_transaction = await Storage.get_prisma().transaction.find_first(
- where={'solana_signature': solana_signature}
- )
+class LogProcessor:
+ @staticmethod
+ def save_log(log: Dict[str, Any]) -> None:
+ """Save log to JSON file with timestamp."""
+ try:
+ os.makedirs('./logs', exist_ok=True)
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
+ filename = f"./logs/log_{timestamp}.json"
+ with open(filename, 'w') as f:
+ json.dump(log, f, indent=2)
+ except Exception as e:
+ logger.error(f"Error saving RPC log: {e}")
- if not existing_transaction:
- # Store the transaction if it doesn't exist
- transaction_data = {
- 'wallet_id': wallet_id,
- 'type': transaction_type,
- 'sell_currency': sell_currency,
- 'sell_amount': sell_amount,
- 'sell_value': sell_value,
- 'buy_currency': buy_currency,
- 'buy_amount': buy_amount,
- 'buy_value': buy_value,
- 'solana_signature': solana_signature,
- 'details': details
+ @staticmethod
+ def extract_transaction_details(logs: list) -> Dict[str, Any]:
+ """Extract transaction details from logs."""
+ tr_details = {
+ "order_id": None,
+ "token_in": None,
+ "token_out": None,
+ "amount_in": 0,
+ "amount_out": 0,
+ "amount_in_USD": 0,
+ "amount_out_USD": 0,
+ "percentage_swapped": 0
}
- storage = Storage()
- await storage.store_transaction(transaction_data)
- # Rename the file to append '_saved'
- new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix)
- os.rename(file_path, new_file_path)
+ before_source_balance = 0
+ source_token_change = 0
-async def watch_for_new_logs():
- while True:
- for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK):
- await process_log_file(file_path)
- await asyncio.sleep(10) # Check every 10 seconds
+ for i, log_entry in enumerate(logs):
+ if tr_details["order_id"] is None and "order_id" in log_entry:
+ tr_details["order_id"] = log_entry.split(":")[-1].strip()
+ tr_details["token_in"] = logs[i + 1].split(":")[-1].strip()
+ tr_details["token_out"] = logs[i + 2].split(":")[-1].strip()
+
+ if "source_token_change" in log_entry:
+ parts = log_entry.split(", ")
+ for part in parts:
+ if "source_token_change" in part:
+ tr_details["amount_in"] = float(part.split(":")[-1].strip()) / 10 ** 6
+ elif "destination_token_change" in part:
+ tr_details["amount_out"] = float(part.split(":")[-1].strip()) / 10 ** 6
+
+ if "before_source_balance" in log_entry:
+ before_source_balance = float(log_entry.split(":")[-1].strip()) / 10 ** 6
+ if "source_token_change" in log_entry:
+ source_token_change = float(log_entry.split(":")[-1].strip()) / 10 ** 6
+
+ if before_source_balance > 0 and source_token_change > 0:
+ tr_details["percentage_swapped"] = (source_token_change / before_source_balance) * 100
+ if tr_details["percentage_swapped"] > 100:
+ tr_details["percentage_swapped"] /= 1000
+
+ return tr_details
+
+ @staticmethod
+ async def process_log(log_result: Dict[str, Any]) -> Dict[str, Any]:
+ """Process a single log entry."""
+ if log_result['value']['err']:
+ return
+
+ logs = log_result['value']['logs']
+ swap_operations = [
+ 'Program log: Instruction: Swap',
+ 'Program log: Instruction: Swap2',
+ 'Program log: Instruction: SwapExactAmountIn',
+ 'Program log: Instruction: SwapV2'
+ ]
+
+ try:
+ if not any(op in logs for op in swap_operations):
+ return
+
+ LogProcessor.save_log(log_result)
+ tx_signature = log_result['value']['signature']
+ tr_details = LogProcessor.extract_transaction_details(logs)
+
+ if not all([tr_details["token_in"], tr_details["token_out"],
+ tr_details["amount_in"], tr_details["amount_out"]]):
+ tr_details = await SAPI.get_transaction_details_info(tx_signature, logs)
+
+ # Update token information
+ token_in = SAPI.dex.TOKENS_INFO[tr_details["token_in"]]
+ token_out = SAPI.dex.TOKENS_INFO[tr_details["token_out"]]
+
+ tr_details.update({
+ "symbol_in": token_in.get('symbol'),
+ "symbol_out": token_out.get('symbol'),
+ "amount_in_USD": tr_details['amount_in'] * token_in.get('price', 0),
+ "amount_out_USD": tr_details['amount_out'] * token_out.get('price', 0)
+ })
+
+ # Send notification
+ message = (
+ f"Swap detected: \n"
+ f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} "
+ f"({tr_details['percentage_swapped']:.2f}% ) swapped for {tr_details['symbol_out']}"
+ )
+ await TelegramUtils.send_telegram_message(message)
+
+ # Follow up actions
+ await SAPI.follow_move(tr_details)
+ await SAPI.save_token_info()
+
+ except Exception as e:
+ logger.error(f"Error processing log: {e}")
+ await TelegramUtils.send_telegram_message("Not followed! Error following move.")
+
+ return tr_details
+
+
+ @staticmethod
+ async def process_log_fileDump(self, file_path):
+ # Read the file and extract transaction data
+ with open(file_path, 'r') as file:
+ data = file.read()
+ # Assume data is parsed into these variables
+ wallet_id = "extracted_wallet_id"
+ transaction_type = "extracted_transaction_type"
+ sell_currency = "extracted_sell_currency"
+ sell_amount = 0.0
+ sell_value = 0.0
+ buy_currency = "extracted_buy_currency"
+ buy_amount = 0.0
+ buy_value = 0.0
+ solana_signature = "extracted_solana_signature"
+ details = {}
+
+ # Process the webhook data
+ solana_api = SolanaAPI()
+ transaction_data = await solana_api.process_wh(data)
+
+ # Check if the transaction already exists
+ existing_transaction = await Storage.get_prisma().transaction.find_first(
+ where={'solana_signature': solana_signature}
+ )
+
+ if not existing_transaction:
+ # Store the transaction if it doesn't exist
+ transaction_data = {
+ 'wallet_id': wallet_id,
+ 'type': transaction_type,
+ 'sell_currency': sell_currency,
+ 'sell_amount': sell_amount,
+ 'sell_value': sell_value,
+ 'buy_currency': buy_currency,
+ 'buy_amount': buy_amount,
+ 'buy_value': buy_value,
+ 'solana_signature': solana_signature,
+ 'details': details
+ }
+ storage = Storage()
+ await storage.store_transaction(transaction_data)
+
+ # Rename the file to append '_saved'
+ new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix)
+ os.rename(file_path, new_file_path)
+
+ @staticmethod
+ async def watch_for_new_logs():
+ while True:
+ for file_path in Path(LOG_DIRECTORY).glob(FILE_MASK):
+ await LogProcessor.process_log_fileDump(file_path)
+ await asyncio.sleep(10) # Check every 10 seconds
diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py
index 53a8c60..3449343 100644
--- a/crypto/sol/modules/webui.py
+++ b/crypto/sol/modules/webui.py
@@ -21,7 +21,7 @@ from datetime import datetime
on_transaction = None
-async def init_app(tr_handler=None):
+def init_app(tr_handler=None):
global on_transaction
on_transaction = tr_handler
app = Flask(__name__, template_folder='../templates', static_folder='../static')
@@ -38,7 +38,7 @@ async def init_app(tr_handler=None):
if not storage.is_connected():
await storage.connect()
- asyncio.run(ensure_storage_connection())
+ ensure_storage_connection()
# oauth = OAuth(app)
# google = oauth.remote_app(