From 5851af8f804d845adef973c85c2c8d648cf57b0c Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 11 Nov 2024 22:38:05 +0200 Subject: [PATCH] tick --- crypto/sol/.env | 19 ++-- crypto/sol/app.py | 161 ---------------------------- crypto/sol/modules/SolanaAPI.py | 17 ++- crypto/sol/modules/log_processor.py | 14 ++- crypto/sol/modules/storage.py | 35 +++--- crypto/sol/modules/utils.py | 27 ++++- crypto/sol/modules/webui.py | 31 +++++- prisma/schema.prisma | 10 +- 8 files changed, 115 insertions(+), 199 deletions(-) diff --git a/crypto/sol/.env b/crypto/sol/.env index fac7699..a7bcbe2 100644 --- a/crypto/sol/.env +++ b/crypto/sol/.env @@ -1,3 +1,10 @@ + +#https://dev.d-popov.com/wh +#7keSmTZozjmuX66gd9GBSJYEHnMqsyutWpvuuKtXZKDH +#9U7D916zuQ8qcL9kQZqkcroWhHGho5vD8VNekvztrutN +#3EZkyU9zQRrHPnrNovDiRCA9Yg3wLK35u9cdrcGcszi1 +#7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV + SOLANA_WS_URL="wss://api.mainnet-beta.solana.com" SOLANA_WS_URL2="wss://mainnet.rpcpool.com" @@ -23,11 +30,11 @@ LIQUIDITY_TOKENS=EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v,So1111111111111111 PRIORITY=1 # 0-10, 5 = market cap, 10 twice market cap DO_WATCH_WALLET=True # Niki's to Sync: [PROD] -FOLLOWED_WALLET="3EZkyU9zQRrHPnrNovDiRCA9Yg3wLK35u9cdrcGcszi1" -YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" -PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH +#FOLLOWED_WALLET="3EZkyU9zQRrHPnrNovDiRCA9Yg3wLK35u9cdrcGcszi1" +#YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +#PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH # Sync to main [DEV] -#FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" -#YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" -#PK=5ccrMf3BFFE1HMsXt17btK1tMSNay7aBoY27saPHrqg2JEjxKBmBbxUABD9Jh7Gisf1bhM51oGzWdyLUgHdrUJPw \ No newline at end of file +FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" +PK=5ccrMf3BFFE1HMsXt17btK1tMSNay7aBoY27saPHrqg2JEjxKBmBbxUABD9Jh7Gisf1bhM51oGzWdyLUgHdrUJPw \ No newline at end of file diff --git a/crypto/sol/app.py b/crypto/sol/app.py index 22dfd60..5875985 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -205,167 +205,6 @@ async def process_log(log_result): return tr_details - -# async def follow_move_legacy(move): -# global pk -# if pk is None: -# pk = await get_pk() -# your_balances = await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) -# your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) -# if your_balance_info is not None: -# # Use the balance -# print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") -# else: -# print(f"No ballance found for {move['symbol_in']}. Skipping move.") -# await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") -# return - -# your_balance = your_balance_info['amount'] - - -# token_info = SAPI.dex.TOKENS_INFO.get(move['token_in']) -# token_name_in = token_info.get('symbol') or await SAPI.get_token_metadata(move['token_in']) -# token_name_out = SAPI.dex.TOKENS_INFO[move['token_out']].get('symbol') or await SAPI.get_token_metadata_symbol(move['token_out']) - -# if not your_balance: -# msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." -# logging.warning(msg) -# await telegram_utils.send_telegram_message(msg) -# return - -# if FOLLOW_AMOUNT == 'percentage': -# # Calculate the amount to swap based on the same percentage as the followed move -# amount_to_swap = your_balance * (move['percentage_swapped'] / 100) -# elif FOLLOW_AMOUNT == 'exact': -# amount_to_swap = move['amount_in'] -# else: -# try: -# fixed_amount = float(FOLLOW_AMOUNT) # un USD -# fixed_amount_in_token = fixed_amount / move["token_in_price"] -# amount_to_swap = min(fixed_amount_in_token, your_balance) -# except ValueError: -# msg = f"Move not followed:\nInvalid FOLLOW_AMOUNT '{FOLLOW_AMOUNT}'. Must be 'percentage' or a number." -# logging.warning(msg) -# await telegram_utils.send_telegram_message(msg) -# return - -# amount_to_swap = min(amount_to_swap, your_balance) # Ensure we're not trying to swap more than we have - -# decimals = token_info.get('decimals') -# # Convert to lamports -# # if decimals is 6, then amount = amount * 1e6; if 9, then amount = amount * 1e9 -# amount = int(amount_to_swap * 10**decimals) -# amount = int(amount) -# logging.debug(f"Calculated amount in lamports: {amount}") - -# if your_balance < amount_to_swap: # should not happen -# msg = ( -# f"Warning:\n" -# f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." -# ) -# logging.warning(msg) -# await telegram_utils.send_telegram_message(msg) -# try: - -# try: -# notification = ( -# f"Initiating move:\n" -# f"Swapping {amount_to_swap:.2f} {token_name_in} for {token_name_out}" -# + (f" ({move['percentage_swapped']:.2f}%)" if 'percentage_swapped' in move else "") -# ) -# # logging.info(notification) -# # error_logger.info(notification) -# await telegram_utils.send_telegram_message(notification) -# except Exception as e: -# logging.error(f"Error sending notification: {e}") - -# for retry in range(3): -# try: -# private_key = Keypair.from_bytes(base58.b58decode(pk)) -# async_client = AsyncClient(SOLANA_WS_URL) -# jupiter = Jupiter(async_client, private_key) -# transaction_data = await jupiter.swap( -# input_mint=move['token_in'], -# output_mint=move['token_out'], -# amount=amount, -# slippage_bps=300, # Increased to 3% -# ) -# logging.info(f"Initiating move. Transaction data:\n {transaction_data}") -# error_logger.info(f"Initiating move. Transaction data:\n {transaction_data}") -# raw_transaction = VersionedTransaction.from_bytes(base64.b64decode(transaction_data)) -# message = raw_transaction.message -# signature = private_key.sign_message(message.to_bytes_versioned()) -# signed_txn = VersionedTransaction.populate(raw_transaction.message, [signature]) -# opts = TxOpts(skip_preflight=False, preflight_commitment=Processed) - -# # send the transaction -# result = await async_client.send_raw_transaction(txn=bytes(signed_txn), opts=opts) - -# transaction_id = json.loads(result.to_json())['result'] -# print(f"Follow Transaction Sent: https://solscan.io/tx/{transaction_id}") -# # append to notification -# notification += f"\n\nTransaction: {transaction_id}" - -# await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}") -# tx_details = await SAPI.get_transaction_details_with_retry(transaction_id) - -# if tx_details is not None: -# break -# else: -# logging.warning(f"Failed to get transaction details for {transaction_id}. Probably transaction failed. Retrying again...") -# await asyncio.sleep(3) -# except Exception as e: -# error_message = f"Move Failed:\n{str(e)}\n{transaction_data}\n{move}" -# logging.error(error_message) -# # log the errors to /logs/errors.log -# error_logger.error(error_message) -# error_logger.exception(e) -# await telegram_utils.send_telegram_message(error_message) -# amount = amount * 0.75 - -# await SAPI.dex.get_wallet_balances(YOUR_WALLET, doGetTokenName=False) - -# try: -# if tx_details is None: -# logging.info(f"Failed to get transaction details for {transaction_id}") -# notification = ( -# f"Move Followed, failed to get transaction details.\n" -# f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['token_in']}) " -# f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" -# f"\n\nTransaction: {transaction_id}" -# # log_successful_swap () -# ) - -# else: -# notification = ( -# f"Move Followed:\n" -# f"Swapped {amount_to_swap:.6f} {token_name_in} ({move['symbol_in']}) " -# f"(same {move['percentage_swapped']:.2f}% as followed wallet)\n" -# f"for {tx_details['amount_out']:.2f} {token_name_out}" -# # f"Amount In USD: {tr_details['amount_in_USD']}\n" -# f"\n\nTransaction: {transaction_id}" -# ) -# logging.info(notification) -# await telegram_utils.send_telegram_message(notification) -# except Exception as e: -# logging.error(f"Error sending notification: {e}") - -# except Exception as e: -# error_message = f"Swap Follow Error:\n{str(e)}" -# logging.error(error_message) -# # log the errors to /logs/errors.log -# error_logger.error(error_message) -# error_logger.exception(e) \ -# # if error_message contains 'Program log: Error: insufficient funds' -# if 'insufficient funds' in error_message: -# await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") -# else: -# await telegram_utils.send_telegram_message(error_message) - - -# Helper functions - - async def process_messages(websocket): try: while True: diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py index 78a87ee..edaf12f 100644 --- a/crypto/sol/modules/SolanaAPI.py +++ b/crypto/sol/modules/SolanaAPI.py @@ -84,7 +84,18 @@ class SolanaWS: self.on_message = on_message self.websocket = None self.last_msg_responded = False - + + async def save_log(log): + try: + os.makedirs('./logs', exist_ok=True) + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f") + filename = f"./logs/ws_response_{timestamp}.json" + + with open(filename, 'w') as f: + json.dump(log, f, indent=2) + except Exception as e: + logging.error(f"Error saving RPC log: {e}") + async def connect(self): while True: try: @@ -116,7 +127,7 @@ class SolanaWS: response = await self.websocket.recv() response_data = json.loads(response) self.last_msg_responded = True - + await self.save_log(response_data) if 'result' in response_data: return response_data['result'] elif 'error' in response_data: @@ -158,7 +169,7 @@ class SolanaWS: async def receive_messages(self, one = False): while True: try: - + response = await self.websocket.recv() response_data = json.loads(response) self.last_msg_responded = True diff --git a/crypto/sol/modules/log_processor.py b/crypto/sol/modules/log_processor.py index fdc024b..63af635 100644 --- a/crypto/sol/modules/log_processor.py +++ b/crypto/sol/modules/log_processor.py @@ -34,7 +34,19 @@ async def process_log_file(file_path): if not existing_transaction: # Store the transaction if it doesn't exist - await store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details) + transaction_data = { + 'wallet_id': wallet_id, + 'type': transaction_type, + 'sell_currency': sell_currency, + 'sell_amount': sell_amount, + 'sell_value': sell_value, + 'buy_currency': buy_currency, + 'buy_amount': buy_amount, + 'buy_value': buy_value, + 'solana_signature': solana_signature, + 'details': details + } + await store_transaction(transaction_data) # Rename the file to append '_saved' new_file_path = file_path.with_name(file_path.stem + "_saved" + file_path.suffix) diff --git a/crypto/sol/modules/storage.py b/crypto/sol/modules/storage.py index 6bc4851..721736a 100644 --- a/crypto/sol/modules/storage.py +++ b/crypto/sol/modules/storage.py @@ -13,25 +13,26 @@ prisma_client = Prisma() async def init_db(): await prisma_client.connect() -async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details=None): +async def store_transaction(transaction_data): """ - Store a transaction record in the database. + Store a transaction record in the database using a dictionary. """ - await prisma_client.transaction.create( - data={ - 'wallet_id': wallet_id, - 'timestamp': datetime.now().isoformat(), - 'type': transaction_type, - 'sell_currency': sell_currency, - 'sell_amount': sell_amount, - 'sell_value': sell_value, - 'buy_currency': buy_currency, - 'buy_amount': buy_amount, - 'buy_value': buy_value, - 'solana_signature': solana_signature, - 'details': json.dumps(details or {}) - } - ) + default_data = { + 'wallet_id': None, + 'timestamp': datetime.now().isoformat(), + 'type': None, + 'sell_currency': None, + 'sell_amount': 0.0, + 'sell_value': 0.0, + 'buy_currency': None, + 'buy_amount': 0.0, + 'buy_value': 0.0, + 'solana_signature': None, + 'details': json.dumps({}), + 'status': prisma_client.transactionStatus.ORIGINAL + } + default_data.update(transaction_data) + await prisma_client.transaction.create(data=default_data) async def update_holdings(wallet_id, currency, amount_change): holding = await prisma_client.holding.find_first( diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py index 8cf6f77..76a93c8 100644 --- a/crypto/sol/modules/utils.py +++ b/crypto/sol/modules/utils.py @@ -1,6 +1,8 @@ # telegram_utils.py import sys import os + +from base58 import b58decode sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import aiohttp @@ -65,7 +67,8 @@ class CSVFormatter(logging.Formatter): record.wallet_address ]) return '' -class Log: + +class Logging: # Set up success logger for accounting CSV def __init__(self): @@ -105,6 +108,26 @@ class Log: }) + +def decode_instruction_data(data: str) -> dict: + try: + # Decode base58 data + decoded = b58decode(data) + + # First byte usually represents instruction type + instruction_type = decoded[0] if decoded else None + + # Rest of the data might contain amounts, token info etc + # Exact parsing depends on the program (Raydium, Orca, etc) + params = decoded[1:] if len(decoded) > 1 else None + + return { + "instruction_type": instruction_type, + "params": params.hex() if params else None + } + except Exception as e: + return {"error": str(e)} + def safe_get_property(info, property_name, default='Unknown'): if not isinstance(info, dict): return str(default) @@ -154,7 +177,7 @@ async def async_safe_call( # Create a global instance of TelegramUtils telegram_utils = TelegramUtils() -log = Log().logger +log = Logging().logger # You can add more Telegram-related methods to the TelegramUtils class if needed diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py index 2b2b307..d37429c 100644 --- a/crypto/sol/modules/webui.py +++ b/crypto/sol/modules/webui.py @@ -12,7 +12,7 @@ import json from config import LIQUIDITY_TOKENS from modules import storage, utils, SolanaAPI -from modules.utils import async_safe_call +from modules.utils import async_safe_call, decode_instruction_data import os import logging from datetime import datetime @@ -75,7 +75,7 @@ def init_app(tr_handler=None): await utils.telegram_utils.send_telegram_message(notification) # Store the notified transaction in the database - await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'],tx_signature) + await storage.store_transaction(wallet, "SWAP", tr['symbol_in'] , tr['amount_in'], tr['value_in_USD'], tr['symbol_out'], tr['amount_out'], tr['value_out_USD'], tx_signature) # Attempt to execute the copytrade transaction try: await SolanaAPI.SAPI.follow_move(tr) @@ -100,12 +100,34 @@ def init_app(tr_handler=None): request_data = request.get_json() if request.is_json else None if not request_data: return jsonify({"error": "No data in request"}), 400 - logger.info(f"Webhook data: {request_data}") + if "description" in request_data[0] and request_data[0]["description"]: + logger.info(request_data[0]["description"]) + else: + logger.info(f"Webhook data: {request_data}") # save dump to /cache/last-webhook-{datetime}.json with open( os.path.join(SolanaAPI.root_path, 'logs', f'wh_{current_time}.json') , 'w') as f: json.dump(request_data, f) + + + if "meta" in request_data[0]: + meta = request_data[0]["meta"] + # Parse inner instructions + for inner_ix in meta.get("innerInstructions", []): + for instruction in inner_ix.get("instructions", []): + decoded = decode_instruction_data(instruction["data"]) + logger.info(f"Instruction data decoded: {decoded}") + + # Example of pattern matching for specific instruction types + if decoded["instruction_type"] == 1: # Example: swap instruction + # Parse parameters based on program type + # Different DEXes will have different parameter layouts + pass + + + + # await process_wh(request_data) # don't wait for the process to finish asyncio.create_task(process_wh(request_data )) @@ -214,7 +236,8 @@ def init_app(tr_handler=None): logging.error(f"Copytrade transaction failed: {e}") # ToDo - probably optimize await SolanaAPI.DEX.save_token_info() - + else: + logger.info("wh transaction is not a swap. skipping...") except Exception as e: logging.error(f"Error processing transaction notification: {str(e)}") # Log the full traceback for debugging diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 4e3465d..ea6b77f 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -75,14 +75,14 @@ enum TransactionStatus { model Transaction { id Int @id @default(autoincrement()) - transactionId String - amount Float - date DateTime - amountUSD Float + transactionId String + amount Float @default(0.0) + date DateTime @default(now()) + amountUSD Float @default(0.0) holdingId Int holding Holding @relation(fields: [holdingId], references: [id]) isClosed Boolean @default(false) - status TransactionStatus + status TransactionStatus @default(ORIGINAL) originalId Int? original Transaction? @relation("OriginalTransaction", fields: [originalId], references: [id]) relatedTo Transaction[] @relation("OriginalTransaction")