diff --git a/crypto/sol/.env b/crypto/sol/.env
index 9c03dfd..df79f65 100644
--- a/crypto/sol/.env
+++ b/crypto/sol/.env
@@ -14,6 +14,7 @@ DEVELOPER_CHAT_ID="777826553"
TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0"
DISPLAY_CURRENCY=USD
+FOLLOW_AMOUNT=2
# Niki's to Sync: [PROD]
FOLLOWED_WALLET="7keSmTZozjmuX66gd9GBSJYEHnMqsyutWpvuuKtXZKDH"
diff --git a/crypto/sol/app.py b/crypto/sol/app.py
index 51af546..ce16f04 100644
--- a/crypto/sol/app.py
+++ b/crypto/sol/app.py
@@ -3,24 +3,21 @@ import uvicorn
from asgiref.wsgi import WsgiToAsgi
import websockets
import json
-from flask import Flask, render_template, request, jsonify
import datetime
import base64
import os
import base58
-from dotenv import load_dotenv, set_key
-import aiohttp
-import requests
-import re
-import random
+from dotenv import load_dotenv
+
+
from threading import Thread
from solana.rpc.async_api import AsyncClient
from solders.transaction import VersionedTransaction
from solana.rpc.types import TxOpts
-from solana.rpc.commitment import Confirmed, Finalized, Processed
from solders.keypair import Keypair
from jupiter_python_sdk.jupiter import Jupiter
+from solana.rpc.commitment import Processed
from modules.webui import init_app
from modules.storage import init_db, store_transaction
@@ -192,7 +189,7 @@ async def process_log(log_result):
f"{tr_details['symbol_out']} \n"
)
await telegram_utils.send_telegram_message(message_text)
- await follow_move(tr_details)
+ await SAPI.follow_move(tr_details)
await SAPI.save_token_info()
except Exception as e:
@@ -210,10 +207,11 @@ async def process_log(log_result):
-
-
-async def follow_move(move):
- your_balances = await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+async def follow_move_legacy(move):
+ global pk
+ if pk is None:
+ pk = await get_pk()
+ your_balances = await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
if your_balance_info is not None:
# Use the balance
@@ -243,8 +241,9 @@ async def follow_move(move):
amount_to_swap = move['amount_in']
else:
try:
- fixed_amount = float(FOLLOW_AMOUNT)
- amount_to_swap = min(fixed_amount, your_balance)
+ fixed_amount = float(FOLLOW_AMOUNT) # un USD
+ fixed_amount_in_token = fixed_amount / move["token_in_price"]
+ amount_to_swap = min(fixed_amount_in_token, your_balance)
except ValueError:
msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
logging.warning(msg)
@@ -272,11 +271,12 @@ async def follow_move(move):
try:
notification = (
f"Initiating move:\n"
- f"Swapping {move['percentage_swapped']:.2f}% ({amount_to_swap:.2f}) {token_name_in} for {token_name_out}"
+ f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
+ + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
)
# logging.info(notification)
# error_logger.info(notification)
- # await telegram_utils.send_telegram_message(notification)
+ await telegram_utils.send_telegram_message(notification)
except Exception as e:
logging.error(f"Error sending notification: {e}")
@@ -324,7 +324,7 @@ async def follow_move(move):
await telegram_utils.send_telegram_message(error_message)
amount = amount * 0.75
- await SAPI.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+ await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
try:
if tx_details is None:
@@ -401,7 +401,7 @@ async def process_messages(websocket):
pk = None
-app = init_app()
+app = init_app(follow_move_legacy)
# Convert Flask app to ASGI
asgi_app = WsgiToAsgi(app)
@@ -427,7 +427,7 @@ if __name__ == '__main__':
# Run the ASGI server
uvicorn.run(
"app:asgi_app",
- host="127.0.0.1",
+ host="0.0.0.0",
port=3001,
log_level="debug",
reload=True
diff --git a/crypto/sol/config.py b/crypto/sol/config.py
index b34bfeb..d8a9dc8 100644
--- a/crypto/sol/config.py
+++ b/crypto/sol/config.py
@@ -68,4 +68,5 @@ def get_config():
"SOLANA_HTTP_URL": SOLANA_HTTP_URL,
"DISPLAY_CURRENCY": DISPLAY_CURRENCY,
"BOT_NAME": BOT_NAME,
+ "FOLLOW_AMOUNT": FOLLOW_AMOUNT,
}
\ No newline at end of file
diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py
index ed03dbc..b152671 100644
--- a/crypto/sol/modules/SolanaAPI.py
+++ b/crypto/sol/modules/SolanaAPI.py
@@ -1,10 +1,14 @@
+import struct
import sys
import os
import aiohttp
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-from solders import message
+# Get the directory where the current script is located
+script_dir = os.path.dirname(os.path.abspath(__file__))
+root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
+
from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA
from dexscreener import DexscreenerClient
from solana.rpc.types import TokenAccountOpts, TxOpts
@@ -16,16 +20,18 @@ from solana.transaction import Transaction
from spl.token.client import Token
from base64 import b64decode
import base58
-from solders.rpc.requests import GetTransaction
-from solders.signature import Signature
-from solders.pubkey import Pubkey
-from solders.keypair import Keypair
+from threading import Thread
+from solana.rpc.async_api import AsyncClient
+from solana.rpc.types import TxOpts
+from solana.rpc.commitment import Confirmed, Finalized, Processed
+
from solders.transaction import VersionedTransaction
-from solders.transaction import Transaction
-from solders.message import Message
-from solders.instruction import Instruction
-from solders.hash import Hash
-from solders.instruction import CompiledInstruction
+from solders.keypair import Keypair
+from solana.rpc.async_api import AsyncClient
+from solana.rpc.commitment import Processed
+from solana.rpc.types import TxOpts
+
+from jupiter_python_sdk.jupiter import Jupiter
import asyncio
import json
import logging
@@ -37,6 +43,7 @@ from datetime import datetime
from solana.rpc.types import TokenAccountOpts, TxOpts
from typing import List, Dict, Any, Tuple
import traceback
+import base64
# # # solders/solana libs (solana_client) # # #
from spl.token._layouts import MINT_LAYOUT
@@ -53,9 +60,9 @@ logger = logging.getLogger(__name__)
PING_INTERVAL = 30
SUBSCRIBE_INTERVAL = 10*60 # Resubscribe every 1 minute
-from config import ( FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET)
+from config import (FOLLOW_AMOUNT, FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY, SOLANA_ENDPOINTS, YOUR_WALLET, SOLANA_WS_URL)
-from modules.utils import telegram_utils, async_safe_call
+from modules.utils import telegram_utils, async_safe_call, get_pk
# Use the production Solana RPC endpoint
solana_client = AsyncClient(SOLANA_HTTP_URL)
@@ -182,7 +189,7 @@ class SolanaWS:
await self.websocket.close()
logger.info("WebSocket connection closed")
- async def solana_jsonrpc(method, params=None, jsonParsed=True):
+ async def solana_jsonrpc(self, method, params=None, jsonParsed=True):
if not isinstance(params, list):
params = [params] if params is not None else []
@@ -211,6 +218,8 @@ class SolanaWS:
return None
class SolanaAPI:
+
+ pk = None
def __init__(self, process_transaction_callback = None, on_initial_subscription_callback = None, on_bot_message=None):
self.process_transaction = process_transaction_callback
self.on_initial_subscription = on_initial_subscription_callback
@@ -267,10 +276,16 @@ class SolanaAPI:
if solana_ws.websocket:
await solana_ws.close()
await async_safe_call(self.on_bot_message,"Reconnecting...")
- receive_task.cancel()
- process_task.cancel()
+ if self.receive_task and not self.receive_task.cancelled():
+ receive_task.cancel()
+ if self.process_task and not self.process_task.cancelled():
+ process_task.cancel()
except Exception as e:
logger.error(f"An error occurred while unsubscribing: {e}")
+ finally:
+ self.receive_task = None
+ self.process_task = None
+
await asyncio.sleep(5)
async def get_last_transactions(self, account_address, check_interval=300, limit=1000):
@@ -307,11 +322,9 @@ class SolanaAPI:
await asyncio.sleep(1)
- async def get_token_metadata_symbol(mint_address):
- global TOKENS_INFO
-
- if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]:
- return TOKENS_INFO[mint_address].get('symbol')
+ async def get_token_metadata_symbol(self, mint_address):
+ if mint_address in DEX.TOKENS_INFO and 'symbol' in DEX.TOKENS_INFO[mint_address]:
+ return DEX.TOKENS_INFO[mint_address].get('symbol')
try:
account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address)
@@ -320,32 +333,82 @@ class SolanaAPI:
if 'parsed' in account_data_data and 'info' in account_data_data['parsed']:
account_data_info = account_data_data['parsed']['info']
if 'decimals' in account_data_info:
- if mint_address in TOKENS_INFO:
- TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
+ if mint_address in DEX.TOKENS_INFO:
+ DEX.TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals']
else:
- TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
+ DEX.TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']}
if 'tokenName' in account_data_info:
- if mint_address in TOKENS_INFO:
- TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
+ if mint_address in DEX.TOKENS_INFO:
+ DEX.TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName']
else:
- TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
+ DEX.TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']}
- metadata = await get_token_metadata(mint_address)
+ metadata = await self.get_token_metadata(mint_address)
if metadata:
- if mint_address in TOKENS_INFO:
- TOKENS_INFO[mint_address].update(metadata)
+ if mint_address in DEX.TOKENS_INFO:
+ DEX.TOKENS_INFO[mint_address].update(metadata)
else:
- TOKENS_INFO[mint_address] = metadata
- await save_token_info()
- # TOKENS_INFO[mint_address] = metadata
+ DEX.TOKENS_INFO[mint_address] = metadata
+ await DEX.save_token_info()
+ # DEX.TOKENS_INFO[mint_address] = metadata
# return metadata.get('symbol') or metadata.get('name')
- return TOKENS_INFO[mint_address].get('symbol')
+ return DEX.TOKENS_INFO[mint_address].get('symbol')
except Exception as e:
logging.error(f"Error fetching token name for {mint_address}: {str(e)}")
return None
- async def get_transaction_details_rpc(tx_signature, readfromDump=False):
- global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO
+ async def get_token_metadata(self, mint_address):
+ try:
+ # Convert mint_address to PublicKey if it's a string
+ if isinstance(mint_address, str):
+ mint_pubkey = Pubkey.from_string(mint_address)
+ else:
+ mint_pubkey = mint_address
+
+ # Derive metadata account address
+
+ metadata_program_id = Pubkey.from_string("metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s")
+ metadata_account = Pubkey.find_program_address(
+ [b"metadata", bytes(metadata_program_id), bytes(mint_pubkey)],
+ metadata_program_id
+ )[0]
+
+ # Fetch metadata account info
+ metadata_account_info = await solana_client.get_account_info(metadata_account)
+
+ if metadata_account_info.value is not None:
+ data = metadata_account_info.value.data
+ # name = get_token_name_metadata(data).rstrip("\x00")
+ # Skip the first 1 + 32 + 32 bytes (1 byte for version, 32 bytes each for update authority and mint)
+ offset = 1 + 32 + 32
+ # Read the name length (u32)
+ name_length = struct.unpack(" Dict[str, Any]:
- global TOKENS_INFO
-
+ async def get_transaction_details_info(self, tx_signature_str: str, logs: List[str]) -> Dict[str, Any]:
tr_info = await self.get_transaction_details_with_retry(tx_signature_str)
-
try:
tr_info['percentage_swapped'] = (tr_info['amount_in'] / tr_info['before_source_balance']) * 100 if tr_info['before_source_balance'] > 0 else 50
except Exception as e:
@@ -503,7 +563,7 @@ class SolanaAPI:
# return float(balance['uiTokenAmount']['amount'])
# return 0.0
- async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, max_retries = 16):
+ async def get_transaction_details_with_retry(self, transaction_id, retry_delay = 5, max_retries = 16):
# wait for the transaction to be confirmed
# await async_client.wait_for_confirmation(Signature.from_string(transaction_id))
# query every 5 seconds for the transaction details until not None or 30 seconds
@@ -521,7 +581,7 @@ class SolanaAPI:
return tx_details
- async def get_swap_transaction_details(tx_signature_str):
+ async def get_swap_transaction_details(self, tx_signature_str):
t = await self.get_transaction(Signature.from_string(tx_signature_str), max_supported_transaction_version=0)
try:
parsed_result = {
@@ -564,7 +624,7 @@ class SolanaAPI:
return None
- async def get_token_balance_rpc(wallet_address, token_address):
+ async def get_token_balance_rpc(self, wallet_address, token_address):
try:
accounts = await self.solana_ws.solana_jsonrpc("getTokenAccountsByOwner", [
wallet_address,
@@ -601,7 +661,163 @@ class SolanaAPI:
logging.error(f"Error getting balance for {token_address} in {wallet_address}: {str(e)} \r\n {e}")
return 0
-
+ async def follow_move(self,move):
+ your_balances = await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+ your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None)
+ if your_balance_info is not None:
+ # Use the balance
+ print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}")
+ else:
+ print(f"No ballance found for {move['symbol_in']}. Skipping move.")
+ await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.")
+ return
+
+ your_balance = your_balance_info['amount']
+
+
+ token_info = DEX.TOKENS_INFO.get(move['token_in'])
+ token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata_symbol(move['token_in'])
+ token_name_out = DEX.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out'])
+
+ if not your_balance:
+ msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move."
+ logging.warning(msg)
+ await telegram_utils.send_telegram_message(msg)
+ return
+
+ if FOLLOW_AMOUNT == 'percentage':
+ # Calculate the amount to swap based on the same percentage as the followed move
+ amount_to_swap = your_balance * (move['percentage_swapped'] / 100)
+ elif FOLLOW_AMOUNT == 'exact':
+ amount_to_swap = move['amount_in']
+ else:
+ try:
+ fixed_amount = float(FOLLOW_AMOUNT) # un USD
+ fixed_amount_in_token = fixed_amount / move["token_in_price"]
+ amount_to_swap = min(fixed_amount_in_token, your_balance)
+ except ValueError:
+ msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number."
+ logging.warning(msg)
+ await telegram_utils.send_telegram_message(msg)
+ return
+
+ amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have
+
+ decimals = token_info.get('decimals')
+ # Convert to lamports
+ # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9
+ amount = int(amount_to_swap * 10**decimals)
+ amount = int(amount)
+ logging.debug(f"Calculated amount in lamports: {amount}")
+
+ if your_balance < amount_to_swap: # should not happen
+ msg = (
+ f"Warning:\n"
+ f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway."
+ )
+ logging.warning(msg)
+ await telegram_utils.send_telegram_message(msg)
+ try:
+
+ try:
+ notification = (
+ f"Initiating move:\n"
+ f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}"
+ + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "")
+ )
+ # logging.info(notification)
+ # error_logger.info(notification)
+ # await telegram_utils.send_telegram_message(notification)
+ except Exception as e:
+ logging.error(f"Error sending notification: {e}")
+
+ if self.pk is None:
+ self.pk = await get_pk()
+ for retry in range(3):
+ try:
+ private_key = Keypair.from_bytes(base58.b58decode(self.pk))
+ async_client = AsyncClient(SOLANA_WS_URL)
+ jupiter = Jupiter(async_client, private_key)
+ transaction_data = await jupiter.swap(
+ input_mint=move['token_in'],
+ output_mint=move['token_out'],
+ amount=int(amount),
+ slippage_bps=300, # Increased to 3%
+ )
+ logging.info(f"Initiating move. Transaction data:\n {transaction_data}")
+ # error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}")
+ raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data))
+ message = raw_transaction.message
+
+ signature = private_key.sign_message( bytes(message) )
+ # signature = private_key.sign_message(message.to_bytes_versioned())
+ signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature])
+ opts = TxOpts(skip_preflight=False, preflight_commitment=Processed)
+
+ # send the transaction
+ result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts)
+
+ transaction_id = json.loads(result.to_json())['result']
+ print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}")
+ # append to notification
+ notification += f"\n\nTransaction: {transaction_id}"
+
+ await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}")
+ tx_details = await SAPI.get_transaction_details_with_retry(transaction_id)
+
+ if tx_details is not None:
+ break
+ else:
+ logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...")
+ await asyncio.sleep(3)
+ except Exception as e:
+ error_message = f"Move Failed:\n{str(e)}\n{transaction_data}\n{move}"
+ logging.error(error_message)
+ # log the errors to /logs/errors.log
+ # error_logger.error(error_message)
+ # error_logger.exception(e)
+ await telegram_utils.send_telegram_message(error_message)
+ amount = int(amount * 0.75)
+
+ await DEX.get_wallet_balances(YOUR_WALLET, doGetTokenName=False)
+
+ try:
+ if tx_details is None:
+ logging.info(f"Failed to get transaction details for {transaction_id}")
+ notification = (
+ f"Move Followed, failed to get transaction details.\n"
+ f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) "
+ f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
+ f"\n\nTransaction: {transaction_id}"
+ # log_successful_swap ()
+ )
+
+ else:
+ notification = (
+ f"Move Followed:\n"
+ f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) "
+ f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n"
+ f"for {tx_details['amount_out']:.2f} {token_name_out}"
+ # f"Amount In USD: {tr_details['amount_in_USD']}\n"
+ f"\n\nTransaction: {transaction_id}"
+ )
+ logging.info(notification)
+ await telegram_utils.send_telegram_message(notification)
+ except Exception as e:
+ logging.error(f"Error sending notification: {e}")
+
+ except Exception as e:
+ error_message = f"Swap Follow Error:\n{str(e)}"
+ logging.error(error_message)
+ # log the errors to /logs/errors.log
+ # error_logger.error(error_message)
+ # error_logger.exception(e)
+ # if error_message contains 'Program log: Error: insufficient funds'
+ if 'insufficient funds' in error_message:
+ await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.")
+ else:
+ await telegram_utils.send_telegram_message(error_message)
+
@@ -614,9 +830,11 @@ class SolanaDEX:
self.TOKEN_ADDRESSES = {}
self.FOLLOWED_WALLET_VALUE = 0
self.YOUR_WALLET_VALUE = 0
+ self.token_info_path = os.path.normpath(os.path.join(script_dir, '..', 'cache', 'token_info.json'))
try:
- with open('../logs/token_info.json', 'r') as f:
+ with open(self.token_info_path, 'r') as f:
self.TOKENS_INFO = json.load(f)
+
except Exception as e:
logging.error(f"Error loading token info: {str(e)}")
@@ -654,7 +872,7 @@ class SolanaDEX:
for token, price in prices.items():
token_info = self.TOKENS_INFO.setdefault(token, {})
if 'symbol' not in token_info:
- token_info['symbol'] = await self.get_token_metadata_symbol(token)
+ token_info['symbol'] = await SAPI.get_token_metadata_symbol(token)
token_info['price'] = price
return prices
@@ -914,21 +1132,16 @@ class SolanaDEX:
# save token info to file
await self.save_token_info()
+ async def save_token_info(self):
+ with open(self.token_info_path, 'w') as f:
+ json.dump(self.TOKENS_INFO, f, indent=2)
+
@staticmethod
def safe_get_property(obj, prop):
return obj.get(prop, 'N/A')
- async def get_token_metadata_symbol(self, token_address):
- # Implement this method to fetch token metadata symbol
- pass
- async def save_token_info(self):
- # Implement this method to save token info to a file
- pass
-
-async def save_token_info():
- with open('./logs/token_info.json', 'w') as f:
- json.dump(TOKENS_INFO, f, indent=2)
+
DEX = SolanaDEX(DISPLAY_CURRENCY)
SAPI = SolanaAPI( on_initial_subscription_callback=DEX.list_initial_wallet_states(FOLLOWED_WALLET,YOUR_WALLET))
\ No newline at end of file
diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py
index 35b142b..8cf6f77 100644
--- a/crypto/sol/modules/utils.py
+++ b/crypto/sol/modules/utils.py
@@ -11,7 +11,7 @@ from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME
import asyncio
-from typing import Callable, Any
+from typing import Callable, Any, Union, Coroutine
import time
import logging
from logging.handlers import RotatingFileHandler
@@ -111,7 +111,11 @@ def safe_get_property(info, property_name, default='Unknown'):
value = info.get(property_name, default)
return str(value) if value is not None else str(default)
-async def async_safe_call(func: Callable, *args: Any, **kwargs: Any) -> Any:
+async def async_safe_call(
+ func: Union[Callable, Coroutine, None],
+ *args: Any,
+ **kwargs: Any
+) -> Any:
"""
Safely call a function that might be synchronous, asynchronous, or a coroutine object.
@@ -123,18 +127,30 @@ async def async_safe_call(func: Callable, *args: Any, **kwargs: Any) -> Any:
if func is None:
return None
- if callable(func):
- if asyncio.iscoroutinefunction(func):
- return await func(*args, **kwargs)
+ try:
+ if asyncio.iscoroutine(func):
+ # If func is already a coroutine object, just await it
+ return await func
+ elif callable(func):
+ if asyncio.iscoroutinefunction(func):
+ # If func is an async function, call it with args and await
+ return await func(*args, **kwargs)
+ else:
+ # If func is a regular function, just call it
+ return func(*args, **kwargs)
else:
- return func(*args, **kwargs)
- elif asyncio.iscoroutine(func):
- # If func is already a coroutine object, just await it
- return await func
- else:
- logging.warning(f"Expected a callable or coroutine, but got {type(func)}: {func}")
+ logging.warning(f"Expected a callable or coroutine, but got {type(func)}: {func}")
+ return None
+
+ except RuntimeError as e:
+ if "cannot reuse already awaited coroutine" in str(e):
+ logging.error(f"Attempted to reuse an already awaited coroutine: {func}")
+ else:
+ logging.error(f"Runtime error in async_safe_call: {e}")
+ return None
+ except Exception as e:
+ logging.error(f"Error in async_safe_call: {type(e).__name__}: {e}")
return None
-
# Create a global instance of TelegramUtils
telegram_utils = TelegramUtils()
diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py
index 79a2391..0ea4aec 100644
--- a/crypto/sol/modules/webui.py
+++ b/crypto/sol/modules/webui.py
@@ -2,13 +2,19 @@ from flask import Flask, jsonify, request, render_template, redirect, url_for
# from flask_oauthlib.client import OAuth
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
import secrets
+import json
from modules import storage, utils, SolanaAPI
+from modules.utils import async_safe_call
import os
import logging
+from datetime import datetime
-def init_app():
+on_transaction = None
+
+def init_app(tr_handler=None):
+ global on_transaction
+ on_transaction = tr_handler
app = Flask(__name__, template_folder='../templates', static_folder='../static')
- app.config['SECRET_KEY'] = 'your-secret-key'
login_manager = LoginManager(app)
login_manager.login_view = 'login'
@@ -38,7 +44,7 @@ def init_app():
async def transaction_notified(wallet, tx_signature):
try:
logger.info(f"Processing transaction notification for wallet: {wallet}, tx: {tx_signature}")
- request_data = request.get_json()
+ request_data = request.get_json() if request.is_json else None
if not request_data:
# Process the transaction
# tr = await get_swap_transaction_details(tx_signature)
@@ -50,6 +56,9 @@ def init_app():
# ToDo - probably optimize
tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in'])
tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out'])
+ prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']])
+ tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in']
+ tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out']
notification = (
f"Got TXN notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n"
@@ -58,16 +67,15 @@ def init_app():
await utils.telegram_utils.send_telegram_message(notification)
# Store the notified transaction in the database
- storage.store_transaction(tr)
-
+ await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
# Attempt to execute the copytrade transaction
try:
await SolanaAPI.SAPI.follow_move(tr)
# Store the successful copytrade transaction
- storage.store_copytrade_transaction(tr, success=True)
+ await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
except Exception as e:
# Store the failed copytrade transaction
- storage.store_copytrade_transaction(tr, success=False, error=str(e))
+ await storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
logging.error(f"Copytrade transaction failed: {e}")
# ToDo - probably optimize
await SolanaAPI.SAPI.save_token_info()
@@ -76,8 +84,136 @@ def init_app():
logging.error(f"Error processing transaction: {e}")
return jsonify({"error": "Failed to process transaction"}), 500
+ @app.route('/wh', methods=['POST'])
+ async def webhook():
+ try:
+ current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
+ logger.info("Processing webhook")
+ request_data = request.get_json() if request.is_json else None
+ if not request_data:
+ return jsonify({"error": "No data in request"}), 400
+ logger.info(f"Webhook data: {request_data}")
+ # save dump to /cache/last-webhook-{datetime}.json
+
+ with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f:
+ json.dump(request_data, f)
+
+ process_wh(request_data)
+ return jsonify({"status": "Webhook processed"}), 200
+ except Exception as e:
+ logging.error(f"Error processing webhook: {e}")
+ return jsonify({"error": "Failed to process webhook"}), 500
+
+ # Flask route to retry processing the last log
+
+ async def process_wh( data):
+ global on_transaction
+
+ try:
+ if data[0].get('type') == "SWAP":
+ swap_event = data[0]['events'].get('swap')
+ if not swap_event:
+ logging.warning("No swap event found in data")
+ return
+
+ # Extract token input details from the first token input
+ token_inputs = swap_event.get('tokenInputs', [])
+ token_outputs = swap_event.get('tokenOutputs', [])
+
+ if not token_inputs or not token_outputs:
+ logging.warning("Missing token inputs or outputs")
+ return
+
+ tr = {
+ 'token_in': token_inputs[0]['mint'],
+ 'token_out': token_outputs[0]['mint'],
+ 'amount_in': float(token_inputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_inputs[0]['rawTokenAmount']['decimals'],
+ 'amount_out': float(token_outputs[0]['rawTokenAmount']['tokenAmount']) / 10**token_outputs[0]['rawTokenAmount']['decimals'],
+ }
+
+ if not tr["token_in"] or not tr["token_out"] or tr["amount_in"] == 0 or tr["amount_out"] == 0:
+ logging.warning("Incomplete swap details found in logs. Getting details from transaction")
+ tx_signature = data[0].get('signature')
+ logs = data[0].get('logs', [])
+ tr = await SolanaAPI.SAPI.get_transaction_details_info(tx_signature, logs)
+
+ wallet = data[0]['feePayer'] # Using feePayer as the wallet address
+ tx_signature = data[0]['signature']
+
+ # ToDo - probably optimize
+ tr['symbol_in'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_in'])
+ tr['symbol_out'] = await SolanaAPI.SAPI.get_token_metadata_symbol(tr['token_out'])
+ prices = await SolanaAPI.DEX.get_token_prices([tr['token_in'], tr['token_out']])
+ tr["token_in_price"] = prices.get(tr['token_in'], 0)
+ tr["token_out_price"] = prices.get(tr['token_out'], 0)
+ tr['value_in_USD'] = prices.get(tr['token_in'], 0) * tr['amount_in']
+ tr['value_out_USD'] = prices.get(tr['token_out'], 0) * tr['amount_out']
+
+ notification = (
+ f"Got WH notification:: {tr['amount_in']} {tr['symbol_in']} swapped for {tr['symbol_out']} \n"
+ )
+ logging.info(notification)
+ await utils.telegram_utils.send_telegram_message(notification)
+
+ # Store the notified transaction in the database
+ storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
+ # Attempt to execute the copytrade transaction
+ try:
+ # await SolanaAPI.SAPI.follow_move(tr)
+ if on_transaction:
+ await async_safe_call( on_transaction, tr)
+ else:
+ await SolanaAPI.SAPI.follow_move(tr)
+ # Store the successful copytrade transaction
+ storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
+ except Exception as e:
+ # Store the failed copytrade transaction
+ storage.store_transaction(wallet, "SWAP_FAIL", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
+ logging.error(f"Copytrade transaction failed: {e}")
+ # ToDo - probably optimize
+ await SolanaAPI.DEX.save_token_info()
+
+ except Exception as e:
+ logging.error(f"Error processing transaction notification: {str(e)}")
+ # Log the full traceback for debugging
+ import traceback
+ logging.error(traceback.format_exc())
+
+ @app.route('/retry', methods=['GET'])
+ @app.route('/retry-last-log', methods=['GET'])
+ async def retry_last_log():
+ wh = request.args.get('wh', 'false').lower() == 'true'
+
+ latest_log_file = get_latest_log_file(wh)
+ if not latest_log_file:
+ return jsonify({"error": "No log files found"}), 404
+
+ try:
+ utils.log.info(f"Processing latest log file: {latest_log_file}")
+ with open(latest_log_file, 'r') as f:
+ log = json.load(f)
+
+ if wh:
+ result = await process_wh(log)
+ else:
+ result = await SolanaAPI.process_log(log)
+
+ return jsonify({
+ "file": latest_log_file,
+ "status": "Log dump processed successfully",
+ "result": result
+ }), 200
+
+ except Exception as e:
+ utils.log.error(f"Error processing log dump: {e}")
+ return jsonify({"error": "Failed to process log"}), 500
+
+
+
-
+ # # # #
+ # AUTHENTICATION
+ # # # #
@app.route('/login/google/authorized')
def authorized():
@@ -123,7 +259,9 @@ def init_app():
else:
return render_template('login.html', error='Invalid credentials')
elif request.args.get('google'):
- return google.authorize(callback=url_for('authorized', _external=True))
+ # Uncomment the following line if Google OAuth is set up
+ # return google.authorize(callback=url_for('authorized', _external=True))
+ return render_template('login.html', error='Google OAuth not configured')
return render_template('login.html')
@app.route('/logout')
@@ -155,43 +293,20 @@ def init_app():
def get_holdings(wallet_id):
holdings = storage.get_holdings(wallet_id)
return jsonify(holdings)
-
-
- # Flask route to retry processing the last log
- @app.route('/retry', methods=['GET'])
- @app.route('/retry-last-log', methods=['GET'])
- async def retry_last_log():
- latest_log_file = get_latest_log_file()
- if not latest_log_file:
- return jsonify({"error": "No log files found"}), 404
+
+ return app
- try:
- utils.log.info(f"Processing latest log file: {latest_log_file}")
- with open(latest_log_file, 'r') as f:
- log = json.load(f)
-
- result = await SolanaAPI.process_log(log)
-
- return jsonify({
- "file": latest_log_file,
- "status": "Log dump processed successfully",
- "result": result
- }), 200
-
- except Exception as e:
- utils.log.error(f"Error processing log dump: {e}")
- return jsonify({"error": "Failed to process log"}), 500
-
-
- return app
# Function to find the latest log file
-def get_latest_log_file():
- log_dir = './logs'
+def get_latest_log_file(wh:bool):
+ log_dir = os.path.join(SolanaAPI.root_path, 'logs')
try:
# files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f))]
# filter files mask log_20241005_004103_143116.json
- files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
+ if wh:
+ files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('wh-')]
+ else:
+ files = [f for f in os.listdir(log_dir) if os.path.isfile(os.path.join(log_dir, f)) and f.startswith('log_')]
latest_file = max(files, key=lambda x: os.path.getmtime(os.path.join(log_dir, x)))
return os.path.join(log_dir, latest_file)