This commit is contained in:
Dobromir Popov
2024-11-11 22:38:05 +02:00
parent efd7d8df5a
commit 5851af8f80
8 changed files with 115 additions and 199 deletions

View File

@ -84,7 +84,18 @@ class SolanaWS:
self.on_message = on_message
self.websocket = None
self.last_msg_responded = False
async def save_log(log):
try:
os.makedirs('./logs', exist_ok=True)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"./logs/ws_response_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(log, f, indent=2)
except Exception as e:
logging.error(f"Error saving RPC log: {e}")
async def connect(self):
while True:
try:
@ -116,7 +127,7 @@ class SolanaWS:
response = await self.websocket.recv()
response_data = json.loads(response)
self.last_msg_responded = True
await self.save_log(response_data)
if 'result' in response_data:
return response_data['result']
elif 'error' in response_data:
@ -158,7 +169,7 @@ class SolanaWS:
async def receive_messages(self, one = False):
while True:
try:
response = await self.websocket.recv()
response_data = json.loads(response)
self.last_msg_responded = True

View File

@ -34,7 +34,19 @@ async def process_log_file(file_path):
if not existing_transaction:
# Store the transaction if it doesn't exist
await store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details)
transaction_data = {
'wallet_id': wallet_id,
'type': transaction_type,
'sell_currency': sell_currency,
'sell_amount': sell_amount,
'sell_value': sell_value,
'buy_currency': buy_currency,
'buy_amount': buy_amount,
'buy_value': buy_value,
'solana_signature': solana_signature,
'details': details
}
await store_transaction(transaction_data)
# Rename the file to append '_saved'
new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix)

View File

@ -13,25 +13,26 @@ prisma_client = Prisma()
async def init_db():
await prisma_client.connect()
async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details=None):
async def store_transaction(transaction_data):
"""
Store a transaction record in the database.
Store a transaction record in the database using a dictionary.
"""
await prisma_client.transaction.create(
data={
'wallet_id': wallet_id,
'timestamp': datetime.now().isoformat(),
'type': transaction_type,
'sell_currency': sell_currency,
'sell_amount': sell_amount,
'sell_value': sell_value,
'buy_currency': buy_currency,
'buy_amount': buy_amount,
'buy_value': buy_value,
'solana_signature': solana_signature,
'details': json.dumps(details or {})
}
)
default_data = {
'wallet_id': None,
'timestamp': datetime.now().isoformat(),
'type': None,
'sell_currency': None,
'sell_amount': 0.0,
'sell_value': 0.0,
'buy_currency': None,
'buy_amount': 0.0,
'buy_value': 0.0,
'solana_signature': None,
'details': json.dumps({}),
'status': prisma_client.transactionStatus.ORIGINAL
}
default_data.update(transaction_data)
await prisma_client.transaction.create(data=default_data)
async def update_holdings(wallet_id, currency, amount_change):
holding = await prisma_client.holding.find_first(

View File

@ -1,6 +1,8 @@
# telegram_utils.py
import sys
import os
from base58 import b58decode
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import aiohttp
@ -65,7 +67,8 @@ class CSVFormatter(logging.Formatter):
record.wallet_address
])
return ''
class Log:
class Logging:
# Set up success logger for accounting CSV
def __init__(self):
@ -105,6 +108,26 @@ class Log:
})
def decode_instruction_data(data: str) -> dict:
try:
# Decode base58 data
decoded = b58decode(data)
# First byte usually represents instruction type
instruction_type = decoded[0] if decoded else None
# Rest of the data might contain amounts, token info etc
# Exact parsing depends on the program (Raydium, Orca, etc)
params = decoded[1:] if len(decoded) > 1 else None
return {
"instruction_type": instruction_type,
"params": params.hex() if params else None
}
except Exception as e:
return {"error": str(e)}
def safe_get_property(info, property_name, default='Unknown'):
if not isinstance(info, dict):
return str(default)
@ -154,7 +177,7 @@ async def async_safe_call(
# Create a global instance of TelegramUtils
telegram_utils = TelegramUtils()
log = Log().logger
log = Logging().logger
# You can add more Telegram-related methods to the TelegramUtils class if needed

View File

@ -12,7 +12,7 @@ import json
from config import LIQUIDITY_TOKENS
from modules import storage, utils, SolanaAPI
from modules.utils import async_safe_call
from modules.utils import async_safe_call, decode_instruction_data
import os
import logging
from datetime import datetime
@ -75,7 +75,7 @@ def init_app(tr_handler=None):
await utils.telegram_utils.send_telegram_message(notification)
# Store the notified transaction in the database
await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature)
await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'], tx_signature)
# Attempt to execute the copytrade transaction
try:
await SolanaAPI.SAPI.follow_move(tr)
@ -100,12 +100,34 @@ def init_app(tr_handler=None):
request_data = request.get_json() if request.is_json else None
if not request_data:
return jsonify({"error": "No data in request"}), 400
logger.info(f"Webhook data: {request_data}")
if "description" in request_data[0] and request_data[0]["description"]:
logger.info(request_data[0]["description"])
else:
logger.info(f"Webhook data: {request_data}")
# save dump to /cache/last-webhook-{datetime}.json
with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f:
json.dump(request_data, f)
if "meta" in request_data[0]:
meta = request_data[0]["meta"]
# Parse inner instructions
for inner_ix in meta.get("innerInstructions", []):
for instruction in inner_ix.get("instructions", []):
decoded = decode_instruction_data(instruction["data"])
logger.info(f"Instruction data decoded: {decoded}")
# Example of pattern matching for specific instruction types
if decoded["instruction_type"] == 1: # Example: swap instruction
# Parse parameters based on program type
# Different DEXes will have different parameter layouts
pass
# await process_wh(request_data)
# don't wait for the process to finish
asyncio.create_task(process_wh(request_data ))
@ -214,7 +236,8 @@ def init_app(tr_handler=None):
logging.error(f"Copytrade transaction failed: {e}")
# ToDo - probably optimize
await SolanaAPI.DEX.save_token_info()
else:
logger.info("wh transaction is not a swap. skipping...")
except Exception as e:
logging.error(f"Error processing transaction notification: {str(e)}")
# Log the full traceback for debugging