diff --git a/.vscode/launch.json b/.vscode/launch.json index 8906574..b7e847f 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -39,8 +39,8 @@ "console": "integratedTerminal", "internalConsoleOptions": "neverOpen", "env": { - "NODE_ENV": "demo" - "OPENAI_API_KEY": + "NODE_ENV": "demo", + "OPENAI_API_KEY":"" }, "skipFiles": [ "/**" @@ -69,7 +69,13 @@ "program": "${file}" }, { - "name": "Python Debugger: Python File with Conda", + "name": "py: Sol app.py", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/crypto/sol/app.py", + }, + { + "name": "Python Debugger: Python File with Conda (py)", "type": "debugpy", "request": "launch", "program": "${file}", diff --git a/app_data.db b/app_data.db new file mode 100644 index 0000000..1d388bb Binary files /dev/null and b/app_data.db differ diff --git a/crypto/sol/.env b/crypto/sol/.env index 8f0f868..a0ddf33 100644 --- a/crypto/sol/.env +++ b/crypto/sol/.env @@ -1,19 +1,26 @@ - -SOLANA_WS_URL="wss://api.mainnet-beta.solana.com" -SOLANA_WS_URL2="wss://mainnet.rpcpool.com" -SOLANA_HTTP_URL="https://api.mainnet-beta.solana.com" - -DEVELOPER_CHAT_ID="777826553" -TELEGRAM_BOT_TOKEN="6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw" - -DISPLAY_CURRENCY=USD - -# Niki's to Sync: [PROD] -FOLLOWED_WALLET="7keSmTZozjmuX66gd9GBSJYEHnMqsyutWpvuuKtXZKDH" -YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" -PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH - -# Sync to main [DEV] -#FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" -#YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" + +SOLANA_WS_URL="wss://api.mainnet-beta.solana.com" +SOLANA_WS_URL2="wss://mainnet.rpcpool.com" +SOLANA_HTTP_URL="https://api.mainnet-beta.solana.com" + +# prod, @kevin_ai_robot: +BOT_NAME="Solower" +DEVELOPER_CHAT_ID="777826553" # https://api.telegram.org/bot{token}/getUpdates +TELEGRAM_BOT_TOKEN="6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw" + +# dev, @artitherobot: +BOT_NAME="DEV" +DEVELOPER_CHAT_ID="777826553" +TELEGRAM_BOT_TOKEN="6749075936:AAHUHiPTDEIu6JH7S2fQdibwsu6JVG3FNG0" + +DISPLAY_CURRENCY=USD + +# Niki's to Sync: [PROD] +FOLLOWED_WALLET="7keSmTZozjmuX66gd9GBSJYEHnMqsyutWpvuuKtXZKDH" +YOUR_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +PK=3FxXjNrtEqwAKYj4BpkuLAJPzuKRWykkvjeBYQEVuFqRFWRm9eVcWrrYKbns2M31ESMoASG2WV39w9Dpx532sPUH + +# Sync to main [DEV] +#FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" +#YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" #PK=5ccrMf3BFFE1HMsXt17btK1tMSNay7aBoY27saPHrqg2JEjxKBmBbxUABD9Jh7Gisf1bhM51oGzWdyLUgHdrUJPw \ No newline at end of file diff --git a/crypto/sol/.env.example b/crypto/sol/.env.example deleted file mode 100644 index e949837..0000000 --- a/crypto/sol/.env.example +++ /dev/null @@ -1,15 +0,0 @@ - -SOLANA_WS_URL="wss://api.mainnet-beta.solana.com" -SOLANA_WS_URL2="wss://mainnet.rpcpool.com" -SOLANA_HTTP_URL="https://api.mainnet-beta.solana.com" -DEVELOPER_CHAT_ID="777826553" -# Niki's -# FOLLOWED_WALLET="9U7D916zuQ8qcL9kQZqkcroWhHGho5vD8VNekvztrutN" -# My test Brave sync wallet -FOLLOWED_WALLET="7QXGLRjvyFAmxdRaP9Wk18KwWTMfspF4Na2sr3o3PzxV" - -TELEGRAM_BOT_TOKEN="6805059978:AAHNJKuOeazMSJHc3-BXRCsFfEVyFHeFnjw" -DISPLAY_CURRENCY=USD - -YOUR_WALLET="65nzyZXTLC81MthTo52a2gRJjqryTizWVqpK2fDKLye5" -PK={} \ No newline at end of file diff --git a/crypto/sol/app.py b/crypto/sol/app.py index b7ad51d..f3b4c56 100644 --- a/crypto/sol/app.py +++ b/crypto/sol/app.py @@ -5,7 +5,6 @@ from flask import Flask, render_template, request, jsonify from solana.rpc.async_api import AsyncClient from solana.transaction import Signature from solana.rpc.websocket_api import connect -from solana.rpc.types import TokenAccountOpts, TxOpts from solana.rpc.commitment import Confirmed, Processed from solana.transaction import Transaction from spl.token.client import Token @@ -24,8 +23,8 @@ from solders.instruction import CompiledInstruction from solders import message from jupiter_python_sdk.jupiter import Jupiter, Jupiter_DCA from dexscreener import DexscreenerClient -from telegram import Bot -from telegram.constants import ParseMode +from solana.rpc.types import TokenAccountOpts, TxOpts + import datetime import logging from logging.handlers import RotatingFileHandler @@ -41,6 +40,9 @@ from typing import List, Dict, Any, Tuple import random +from modules.webui import init_app +from modules.storage import init_db, store_transaction + app = Flask(__name__) # config = load_config() @@ -158,12 +160,6 @@ async def retry_last_log(): # Create the bot with the custom connection pool bot = None -# Token addresses (initialize with some known tokens) -TOKEN_ADDRESSES = { - "SOL": "So11111111111111111111111111111111111111112", - "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", - "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", -} TOKENS_INFO = {} try: @@ -173,208 +169,27 @@ except Exception as e: logging.error(f"Error loading token info: {str(e)}") # # # # # # # # # # TELEGRAM # # # # # # # # # # -async def send_telegram_message(message): +if not telegram_utils.bot: try: - await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=message, parse_mode=ParseMode.HTML) - logging.info(f"Telegram message sent: {message}") - # logging.info(f"Telegram message dummy sent: {message}") + asyncio.run(telegram_utils.initialize()) except Exception as e: - logging.error(f"Error sending Telegram message: {str(e)}") + logging.error(f"Error initializing Telegram bot: {str(e)}") +# async def telegram_utils.send_telegram_message(message): +# try: +# await bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) +# logging.info(f"Telegram message sent: {message}") +# # logging.info(f"Telegram message dummy sent: {message}") +# except Exception as e: +# logging.error(f"Error sending Telegram message: {str(e)}") +# # # # # # # # # # DATABASE # # # # # # # # # # + + + # # # # # # # # # # CRYPTO PUBLIC # # # # # # # # # # -async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: - global TOKENS_INFO - - # Skip for USD - prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} - remaining_tokens = [addr for addr in token_addresses if addr not in prices] - - # Try CoinGecko - coingecko_prices = await get_prices_from_coingecko(remaining_tokens) - prices.update(coingecko_prices) - - - # For remaining missing tokens, try Jupiter - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) - prices.update(jupiter_prices) - - - # For tokens not found in CoinGecko, use DexScreener - missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) - if missing_tokens: - dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) - prices.update(dexscreener_prices) - - # For remaining missing tokens, try Raydium - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - raydium_prices = await get_prices_from_raydium(list(missing_tokens)) - prices.update(raydium_prices) - - # For remaining missing tokens, try Orca - missing_tokens = set(remaining_tokens) - set(prices.keys()) - if missing_tokens: - orca_prices = await get_prices_from_orca(list(missing_tokens)) - prices.update(orca_prices) - - # If any tokens are still missing, set their prices to 0 - for token in set(token_addresses) - set(prices.keys()): - prices[token] = 0.0 - logging.warning(f"Price not found for token {token}. Setting to 0.") - - for token, price in prices.items(): - token_info = TOKENS_INFO.setdefault(token, {}) - if 'symbol' not in token_info: - token_info['symbol'] = await get_token_metadata_symbol(token) - token_info['price'] = price - - return prices - - -async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: - base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" - prices = {} - - async def fetch_single_price(session, address): - params = { - "contract_addresses": address, - "vs_currencies": DISPLAY_CURRENCY.lower() - } - try: - async with session.get(base_url, params=params) as response: - if response.status == 200: - data = await response.json() - if address in data and DISPLAY_CURRENCY.lower() in data[address]: - return address, data[address][DISPLAY_CURRENCY.lower()] - else: - logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") - return address, None - - async with aiohttp.ClientSession() as session: - tasks = [fetch_single_price(session, address) for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, price in results: - if price is not None: - prices[address] = price - - return prices - -async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: - base_url = "https://api.dexscreener.com/latest/dex/tokens/" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] - results = await asyncio.gather(*tasks) - - for address, result in zip(token_addresses, results): - if result and 'pairs' in result and result['pairs']: - pair = result['pairs'][0] # Use the first pair (usually the most liquid) - prices[address] = float(pair['priceUsd']) - else: - logging.warning(f"No price data found on DexScreener for token {address}") - except Exception as e: - logging.error(f"Error fetching token prices from DexScreener: {str(e)}") - - return prices - -async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: - url = "https://price.jup.ag/v4/price" - params = { - "ids": ",".join(token_addresses) - } - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url, params=params) as response: - if response.status == 200: - data = await response.json() - for address, price_info in data.get('data', {}).items(): - if 'price' in price_info: - prices[address] = float(price_info['price']) - else: - logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Jupiter: {str(e)}") - return prices - -# New function for Raydium -async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.raydium.io/v2/main/price" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - for address in token_addresses: - if address in data: - prices[address] = float(data[address]) - else: - logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Raydium: {str(e)}") - return prices - -# New function for Orca -async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: - url = "https://api.orca.so/allTokens" - prices = {} - - try: - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - for token_info in data: - if token_info['mint'] in token_addresses: - prices[token_info['mint']] = float(token_info['price']) - else: - logging.error(f"Failed to get token prices from Orca. Status: {response.status}") - except Exception as e: - logging.error(f"Error fetching token prices from Orca: {str(e)}") - return prices - - -async def fetch_token_data(session, url): - try: - async with session.get(url) as response: - if response.status == 200: - return await response.json() - else: - logging.error(f"Failed to fetch data from {url}. Status: {response.status}") - return None - except Exception as e: - logging.error(f"Error fetching data from {url}: {str(e)}") - return None - -async def get_sol_price() -> float: - url = f"https://api.coingecko.com/api/v3/simple/price?ids=solana&vs_currencies={DISPLAY_CURRENCY.lower()}" - async with aiohttp.ClientSession() as session: - async with session.get(url) as response: - if response.status == 200: - data = await response.json() - return data['solana'][DISPLAY_CURRENCY.lower()] - else: - logging.error(f"Failed to get SOL price from CoinGecko. Status: {response.status}") - return await get_sol_price_from_dexscreener() - -async def get_sol_price_from_dexscreener() -> float: - sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address - prices = await get_prices_from_dexscreener([sol_address]) - return prices.get(sol_address, 0.0) - # # # # # # # # # # SOLANA BLOCKCHAIN # # # # # # # # # # @@ -439,44 +254,9 @@ from spl.token.async_client import AsyncToken from spl.token.constants import TOKEN_PROGRAM_ID from borsh_construct import String, CStruct -async def get_token_metadata_symbol(mint_address): - global TOKENS_INFO + + - if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: - return TOKENS_INFO[mint_address].get('symbol') - - try: - account_data_result = await solana_jsonrpc("getAccountInfo", mint_address) - if 'value' in account_data_result and 'data' in account_data_result['value']: - account_data_data = account_data_result['value']['data'] - if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: - account_data_info = account_data_data['parsed']['info'] - if 'decimals' in account_data_info: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] - else: - TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} - if 'tokenName' in account_data_info: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] - else: - TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} - - metadata = await get_token_metadata(mint_address) - if metadata: - if mint_address in TOKENS_INFO: - TOKENS_INFO[mint_address].update(metadata) - else: - TOKENS_INFO[mint_address] = metadata - await save_token_info() - # TOKENS_INFO[mint_address] = metadata - # return metadata.get('symbol') or metadata.get('name') - return TOKENS_INFO[mint_address].get('symbol') - except Exception as e: - logging.error(f"Error fetching token name for {mint_address}: {str(e)}") - return None - - METADATA_STRUCT = CStruct( "update_authority" / String, "mint" / String, @@ -579,80 +359,6 @@ async def get_token_metadata(mint_address): return None -async def get_wallet_balances(wallet_address, doGetTokenName=True): - balances = {} - logging.info(f"Getting balances for wallet: {wallet_address}") - global TOKENS_INFO - try: - response = await solana_client.get_token_accounts_by_owner_json_parsed( - Pubkey.from_string(wallet_address), - opts=TokenAccountOpts( - program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") - ), - commitment=Confirmed - ) - - if response.value: - for account in response.value: - try: - parsed_data = account.account.data.parsed - if isinstance(parsed_data, dict) and 'info' in parsed_data: - info = parsed_data['info'] - if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: - mint = info['mint'] - decimals = info['tokenAmount']['decimals'] - amount = float(info['tokenAmount']['amount'])/10**decimals - if amount > 0: - if mint in TOKENS_INFO: - token_name = TOKENS_INFO[mint].get('symbol') - elif doGetTokenName: - token_name = await get_token_metadata_symbol(mint) or 'N/A' - # sleep for 1 second to avoid rate limiting - await asyncio.sleep(2) - - TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) - TOKENS_INFO[mint]['decimals'] = decimals - balances[mint] = { - 'name': token_name or 'N/A', - 'address': mint, - 'amount': amount, - 'decimals': decimals - } - # sleep for 1 second to avoid rate limiting - logging.debug(f"Account balance for {token_name} ({mint}): {amount}") - else: - logging.warning(f"Unexpected data format for account: {account}") - except Exception as e: - logging.error(f"Error parsing account data: {str(e)}") - - sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) - if sol_balance.value is not None: - balances['SOL'] = { - 'name': 'SOL', - 'address': 'SOL', - 'amount': sol_balance.value / 1e9 - } - else: - logging.warning(f"SOL balance response missing for wallet: {wallet_address}") - - except Exception as e: - logging.error(f"Error getting wallet balances: {str(e)}") - logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") - return balances - -async def convert_balances_to_currency(balances , sol_price): - converted_balances = {} - for address, info in balances.items(): - converted_balance = info.copy() # Create a copy of the original info - if info['name'] == 'SOL': - converted_balance['value'] = info['amount'] * sol_price - elif address in TOKEN_PRICES: - converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] - else: - converted_balance['value'] = None # Price not available - logging.warning(f"Price not available for token {info['name']} ({address})") - converted_balances[address] = converted_balance - return converted_balances async def get_swap_transaction_details(tx_signature_str): @@ -697,244 +403,10 @@ async def get_swap_transaction_details(tx_signature_str): return None - -# # # RAW Solana API RPC # # # - -#this is the meat of the application -async def get_transaction_details_rpc(tx_signature, readfromDump=False): - global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO - try: - if readfromDump and os.path.exists('./logs/transation_details.json'): - with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details - transaction_details = json.load(f) - return transaction_details - else: - transaction_details = await solana_jsonrpc("getTransaction", tx_signature) - with open('./logs/transation_details.json', 'w') as f: - json.dump(transaction_details, f, indent=2) - - if transaction_details is None: - logging.error(f"Error fetching transaction details for {tx_signature}") - return None - - # Initialize default result structure - parsed_result = { - "order_id": None, - "token_in": None, - "token_out": None, - "amount_in": 0, - "amount_out": 0, - "amount_in_USD": 0, - "amount_out_USD": 0, - "percentage_swapped": 0 - } - - # Extract order_id from logs - log_messages = transaction_details.get("meta", {}).get("logMessages", []) - for log in log_messages: - if "order_id" in log: - parsed_result["order_id"] = log.split(":")[2].strip() - break - - # Extract token transfers from innerInstructions - inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) - for instruction_set in inner_instructions: - for instruction in instruction_set.get('instructions', []): - if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': - info = instruction['parsed']['info'] - mint = info['mint'] - amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals - - # Determine which token is being swapped in and out based on zero balances - if parsed_result["token_in"] is None and amount > 0: - parsed_result["token_in"] = mint - parsed_result["amount_in"] = amount - - - if parsed_result["token_in"] is None or parsed_result["token_out"] is None: - # if we've failed to extract token_in and token_out from the transaction details, try a second method - inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) - transfers = [] - - for instruction_set in inner_instructions: - for instruction in instruction_set.get('instructions', []): - if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: - info = instruction['parsed']['info'] - amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) - decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 - adjusted_amount = amount / (10 ** decimals) - # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) - transfers.append({ - 'mint': info.get('mint'), - 'amount': adjusted_amount, - 'source': info['source'], - 'destination': info['destination'] - }) - - # Identify token_in and token_out - if len(transfers) >= 2: - parsed_result["token_in"] = transfers[0]['mint'] - parsed_result["amount_in"] = transfers[0]['amount'] - parsed_result["token_out"] = transfers[-1]['mint'] - parsed_result["amount_out"] = transfers[-1]['amount'] - - # If mint is not provided, query the Solana network for the account data - if parsed_result["token_in"] is None or parsed_result["token_out"] is None: - #for transfer in transfers: - # do only first and last transfer - for transfer in [transfers[0], transfers[-1]]: - if transfer['mint'] is None: - # Query the Solana network for the account data - account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) - - if 'value' in account_data_result and 'data' in account_data_result['value']: - account_data_value = account_data_result['value'] - account_data_data = account_data_value['data'] - if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: - account_data_info = account_data_data['parsed']['info'] - if 'mint' in account_data_info: - transfer['mint'] = account_data_info['mint'] - if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: - await get_token_metadata_symbol(transfer['mint']) - # get actual prices - current_price = await get_token_prices([transfer['mint']]) - - if parsed_result["token_in"] is None: - parsed_result["token_in"] = transfer['mint'] - parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol'] - parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] - parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) - elif parsed_result["token_out"] is None: - parsed_result["token_out"] = transfer['mint'] - parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol'] - parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] - parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price'] - - pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) - for balance in pre_balalnces: - if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: - parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] - break - - - # Calculate percentage swapped - try: - if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: - parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 - else: - # calculate based on total wallet value: FOLLOWED_WALLET_VALUE - parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100 - except Exception as e: - logging.error(f"Error calculating percentage swapped: {e}") - - return parsed_result - - except requests.exceptions.RequestException as e: - print("Error fetching transaction details:", e) - - -async def solana_jsonrpc(method, params = None, jsonParsed = True): - # target json example: - # data = { - # "jsonrpc": "2.0", - # "id": 1, - # "method": "getTransaction", - # "params": [ - # tx_signature, - # { - # "encoding": "jsonParsed", - # "maxSupportedTransactionVersion": 0 - # } - # ] - # } - # if param is not array, make it array - if not isinstance(params, list): - params = [params] - - data = { - "jsonrpc": "2.0", - "id": 1, - "method": method, - "params": params or [] - } - data["params"].append({"maxSupportedTransactionVersion": 0}) - if jsonParsed: - data["params"][1]["encoding"] = "jsonParsed" - - - try: - # url = 'https://solana.drpc.org' - response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) - response.raise_for_status() # Raises an error for bad responses - result = response.json() - if not 'result' in result or 'error' in result: - print("Error fetching data from Solana RPC:", result) - return None - return result['result'] - except Exception as e: - logging.error(f"Error fetching data from Solana RPC: {e}") - return None - - # # # # # # # # # # Functionality # # # # # # # # # # -async def list_initial_wallet_states(): - global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES - global TOKENS_INFO # new - - followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) - your_wallet_balances = await get_wallet_balances(YOUR_WALLET) - - all_token_addresses = list(set(followed_wallet_balances.keys()) | - set(your_wallet_balances.keys()) | - set(TOKEN_ADDRESSES.values())) - - TOKEN_PRICES = await get_token_prices(all_token_addresses) - sol_price = await get_sol_price() - - followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) - your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) - - - TOKEN_ADDRESSES = { - address: info for address, - info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 - } - logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") - - followed_wallet_state = [] - FOLLOWED_WALLET_VALUE = 0 - for address, info in followed_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") - FOLLOWED_WALLET_VALUE += info['value'] - - your_wallet_state = [] - YOUR_WALLET_VALUE = 0 - for address, info in your_converted_balances.items(): - if info['value'] is not None and info['value'] > 0: - your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") - YOUR_WALLET_VALUE += info['value'] - - message = ( - f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" - f"Followed Wallet ({FOLLOWED_WALLET}):\n" - f"{chr(10).join(followed_wallet_state)}\n" - f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Your Wallet ({YOUR_WALLET}):\n" - f"{chr(10).join(your_wallet_state)}\n" - f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" - f"Monitored Tokens:\n" - f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" - ) - - logging.info(message) - await send_telegram_message(message) - - # save token info to file - await save_token_info() def safe_get_property(info, property_name, default='Unknown'): if not isinstance(info, dict): @@ -952,7 +424,7 @@ async def get_transaction_details_with_retry(transaction_id, retry_delay = 5, ma # query every 5 seconds for the transaction details until not None or 30 seconds for _ in range(max_retries): try: - tx_details = await get_transaction_details_rpc(transaction_id) + tx_details = await solanaAPI.get_transaction_details_rpc(transaction_id) if tx_details is not None: break except Exception as e: @@ -1080,14 +552,15 @@ async def process_log(log_result): f"{tr_details['amount_in_USD']:.2f} worth of {tr_details['symbol_in']} ({tr_details['percentage_swapped']:.2f}% ) swapped for " # ({tr_details['token_in']}) ({tr_details['token_out']}) f"{tr_details['symbol_out']} \n" ) - await send_telegram_message(message_text) + await telegram_utils.send_telegram_message(message_text) await follow_move(tr_details) await save_token_info() except Exception as e: logging.error(f"Error aquiring log details and following: {e}") - error_logger.info(f"Error aquiring log details and following:\n {tr_details}") await send_telegram_message(f"Not followed! Error following move.") + + except Exception as e: logging.error(f"Error processing log: {e}") @@ -1136,7 +609,6 @@ def _get_pre_balance(transaction_details: Dict[str, Any], token: str) -> float: async def follow_move(move): - tx_details = None your_balances = await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) your_balance_info = next((balance for balance in your_balances.values() if balance['address'] == move['token_in']), None) if your_balance_info is not None: @@ -1144,7 +616,7 @@ async def follow_move(move): print(f"Your balance: {your_balance_info['amount']} {move['symbol_in']}") else: print("No ballance found for {move['symbol_in']}. Skipping move.") - await send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") + await telegram_utils.send_telegram_message(f"No ballance found for {move['symbol_in']}. Skipping move.") return your_balance = your_balance_info['amount'] @@ -1152,12 +624,12 @@ async def follow_move(move): token_info = TOKENS_INFO.get(move['token_in']) token_name_in = token_info.get('symbol') or await get_token_metadata(move['token_in']) - token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await get_token_metadata_symbol(move['token_out']) + token_name_out = TOKENS_INFO[move['token_out']].get('symbol') or await solanaAPI.get_token_metadata_symbol(move['token_out']) if not your_balance: msg = f"Move not followed:\nNo balance found for token {move['symbol_in']}. Cannot follow move." logging.warning(msg) - await send_telegram_message(msg) + await telegram_utils.send_telegram_message(msg) return if FOLLOW_AMOUNT == 'percentage': @@ -1192,7 +664,7 @@ async def follow_move(move): f"Insufficient balance: {your_balance:.6f} {token_name_in}. We want to swap {amount_to_swap:.6f}\n({move['symbol_in']}, decimals {token_info.get('decimals')} amount {amount}).\n This will probably fail. But we will try anyway." ) logging.warning(msg) - await send_telegram_message(msg) + await telegram_utils.send_telegram_message(msg) try: try: @@ -1202,7 +674,7 @@ async def follow_move(move): ) # logging.info(notification) # error_logger.info(notification) - # await send_telegram_message(notification) + # await telegram_utils.send_telegram_message(notification) except Exception as e: logging.error(f"Error sending notification: {e}") @@ -1232,7 +704,7 @@ async def follow_move(move): # append to notification notification += f"\n\nTransaction: {transaction_id}" - await send_telegram_message(f"Follow Transaction Sent: {transaction_id}") + await telegram_utils.send_telegram_message(f"Follow Transaction Sent: {transaction_id}") tx_details = await get_transaction_details_with_retry(transaction_id) if tx_details is not None: @@ -1246,7 +718,7 @@ async def follow_move(move): # log the errors to /logs/errors.log error_logger.error(error_message) error_logger.exception(e) - await send_telegram_message(error_message) + await telegram_utils.send_telegram_message(error_message) amount = amount * 0.75 await get_wallet_balances(YOUR_WALLET, doGetTokenName=False) @@ -1273,11 +745,6 @@ async def follow_move(move): ) logging.info(notification) await send_telegram_message(notification) - - # Log successful swap details - success_logger_accounting_csv.info( - f"{move['symbol_in']},{move['symbol_out']},{amount_to_swap:.6f},{tx_details['amount_out']:.6f},{move['amount_in_USD']:.2f},{tx_details['amount_out_USD']:.2f},{move['percentage_swapped']:.2f}" - ) except Exception as e: logging.error(f"Error sending notification: {e}") @@ -1289,9 +756,9 @@ async def follow_move(move): error_logger.exception(e) \ # if error_message contains 'Program log: Error: insufficient funds' if 'insufficient funds' in error_message: - await send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") + await telegram_utils.send_telegram_message("Insufficient funds. Cannot follow move. Please check your balance.") else: - await send_telegram_message(error_message) + await telegram_utils.send_telegram_message(error_message) # Helper functions @@ -1379,7 +846,7 @@ async def wallet_watch_loop(): logger.error(f"An unexpected error occurred: {e}") await unsubscribe(websocket, subscription_id) - # await send_telegram_message("reconnecting...") + await send_telegram_message("reconnecting...") logger.info(f"Attempting to reconnect in {reconnect_delay} seconds...") websocket.close() except Exception as e: @@ -1414,7 +881,7 @@ async def subscribe(websocket): return None except websockets.exceptions.ConnectionClosedError as e: logger.error(f"Connection closed unexpectedly: {e}") - # await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") + await send_telegram_message("Connection to Solana network was closed. Not listening for transactions right now. Attempting to reconnect...") await websocket.close() return None except Exception as e: @@ -1476,53 +943,58 @@ async def check_PK(): if not pk: logging.error("Private key not found in environment variables. Will not be able to sign transactions.") # send TG warning message - await send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") + await telegram_utils.send_telegram_message("Warning: Private key not found in environment variables. Will not be able to sign transactions.") +solanaAPI = SolanaAPI(process_transaction_callback=process_log) + async def main(): - global bot, PROCESSING_LOG - # Initialize Telegram Bot - # Create a custom connection pool - conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit - timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout + global solanaAPI, bot, PROCESSING_LOG - bot = Bot(TELEGRAM_BOT_TOKEN) # , request=aiohttp.ClientSession(connector=conn_pool, timeout=timeout).request) - await send_telegram_message("Solana Agent Started. Connecting to mainnet...") + await telegram_utils.send_telegram_message("Solana Agent Started. Connecting to mainnet...") await check_PK() # new: restart wallet_watch_loop every hour - while True: - wallet_watch_task = asyncio.create_task(wallet_watch_loop()) + await solanaAPI.wallet_watch_loop() + + # while True: + # wallet_watch_task = asyncio.create_task(solanaAPI.wallet_watch_loop()) - try: - # Wait for an hour or until the task completes, whichever comes first - await asyncio.wait_for(wallet_watch_task, timeout=3600) - except asyncio.TimeoutError: - # If an hour has passed, cancel the task if not PROCESSING - if PROCESSING_LOG: - logging.info("wallet_watch_loop is processing logs. Will not restart.") - await send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") - else: - wallet_watch_task.cancel() - try: - await wallet_watch_task - except asyncio.CancelledError: - logging.info("wallet_watch_loop was cancelled after running for an hour") - except Exception as e: - logging.error(f"Error in wallet_watch_loop: {str(e)}") - await send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") + # try: + # # Wait for an hour or until the task completes, whichever comes first + # await asyncio.wait_for(wallet_watch_task, timeout=3600) + # except asyncio.TimeoutError: + # # If an hour has passed, cancel the task if not PROCESSING + # if PROCESSING_LOG: + # logging.info("wallet_watch_loop is processing logs. Will not restart.") + # await telegram_utils.send_telegram_message("wallet_watch_loop is processing logs. Will not restart.") + # else: + # wallet_watch_task.cancel() + # try: + # await wallet_watch_task + # except asyncio.CancelledError: + # logging.info("wallet_watch_loop was cancelled after running for an hour") + # except Exception as e: + # logging.error(f"Error in wallet_watch_loop: {str(e)}") + # await telegram_utils.send_telegram_message(f"Error in wallet_watch_loop: {str(e)}") - logging.info("Restarting wallet_watch_loop") - await send_telegram_message("Restarting wallet_watch_loop") + # logging.info("Restarting wallet_watch_loop") + # await telegram_utils.send_telegram_message("Restarting wallet_watch_loop") + + async def run_flask(): + # loop = asyncio.get_running_loop() + # await loop.run_in_executor(None, lambda: app.run(debug=False, port=3001, use_reloader=False)) + app = init_app() loop = asyncio.get_running_loop() await loop.run_in_executor(None, lambda: app.run(debug=False, port=3001, use_reloader=False)) async def run_all(): await asyncio.gather( + init_db(), main(), run_flask() ) diff --git a/crypto/sol/config.py b/crypto/sol/config.py new file mode 100644 index 0000000..e708df8 --- /dev/null +++ b/crypto/sol/config.py @@ -0,0 +1,59 @@ +# config.py + +import os +import logging +from dotenv import load_dotenv +from logging.handlers import RotatingFileHandler + +# Load environment variables +load_dotenv() +load_dotenv('.env.secret') + +# Configuration +DEVELOPER_CHAT_ID = os.getenv("DEVELOPER_CHAT_ID") +FOLLOWED_WALLET = os.getenv("FOLLOWED_WALLET") +YOUR_WALLET = os.getenv("YOUR_WALLET") +TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") +SOLANA_WS_URL = os.getenv("SOLANA_WS_URL") +SOLANA_HTTP_URL = os.getenv("SOLANA_HTTP_URL") +DISPLAY_CURRENCY = os.getenv('DISPLAY_CURRENCY', 'USD') +BOT_NAME = os.getenv("BOT_NAME") + +# Token addresses (initialize with some known tokens) +TOKEN_ADDRESSES = { + "SOL": "So11111111111111111111111111111111111111112", + "USDC": "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v", + "TARD": "4nfn86ssbv7wiqcsw7bpvn46k24jhe334fudtyxhp1og", +} + +# Logging configuration +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Set up error logger +log_dir = './logs' +log_file = os.path.join(log_dir, 'error.log') +os.makedirs(log_dir, exist_ok=True) +error_file_handler = RotatingFileHandler( + log_file, + maxBytes=10*1024*1024, + backupCount=5 +) +error_file_handler.setLevel(logging.ERROR) +error_file_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')) +error_logger = logging.getLogger('error_logger') +error_logger.setLevel(logging.ERROR) +error_logger.addHandler(error_file_handler) + +# Function to get all configuration +def get_config(): + return { + "DEVELOPER_CHAT_ID": DEVELOPER_CHAT_ID, + "FOLLOWED_WALLET": FOLLOWED_WALLET, + "YOUR_WALLET": YOUR_WALLET, + "TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN, + "SOLANA_WS_URL": SOLANA_WS_URL, + "SOLANA_HTTP_URL": SOLANA_HTTP_URL, + "DISPLAY_CURRENCY": DISPLAY_CURRENCY, + "BOT_NAME": BOT_NAME, + } \ No newline at end of file diff --git a/crypto/sol/modules/SolanaAPI.py b/crypto/sol/modules/SolanaAPI.py new file mode 100644 index 0000000..7f5cda2 --- /dev/null +++ b/crypto/sol/modules/SolanaAPI.py @@ -0,0 +1,747 @@ +import sys +import os + +import aiohttp +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import asyncio +import json +import logging +import random +import websockets +from typing import Dict, List, Optional +import requests +from datetime import datetime +from solana.rpc.types import TokenAccountOpts, TxOpts + +logger = logging.getLogger(__name__) + +SOLANA_ENDPOINTS = [ + "wss://api.mainnet-beta.solana.com", +] +PING_INTERVAL = 30 +SUBSCRIBE_INTERVAL = 1*60 # Resubscribe every 1 minute + +from config import ( + FOLLOWED_WALLET, SOLANA_HTTP_URL, DISPLAY_CURRENCY +) + +from modules.utils import telegram_utils + +class SolanaWS: + def __init__(self, on_message: Optional[callable] = None): + self.websocket = None + self.subscription_id = None + self.message_queue = asyncio.Queue() + self.on_message = on_message + self.websocket = None + + async def connect(self): + while True: + try: + current_url = random.choice(SOLANA_ENDPOINTS) + self.websocket = await websockets.connect(current_url, ping_interval=30, ping_timeout=20) + logger.info(f"Connected to Solana websocket: {current_url}") + return + except Exception as e: + logger.error(f"Failed to connect to {current_url}: {e}") + await asyncio.sleep(5) + + async def ws_jsonrpc(self, ws, method, params=None, doProcessResponse = True): + if not isinstance(params, list): + params = [params] if params is not None else [] + + request = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params + } + + await ws.send(json.dumps(request)) + if not doProcessResponse: + return None + else: + response = await self.websocket.recv() + response_data = json.loads(response) + + if 'result' in response_data: + return response_data['result'] + elif 'error' in response_data: + logger.error(f"Error in WebSocket RPC call: {response_data['error']}") + return None + else: + logger.warning(f"Unexpected response: {response_data}") + return None + + async def subscribe(self): + params = [ + {"mentions": [FOLLOWED_WALLET]}, + {"commitment": "confirmed"} + ] + result = await self.ws_jsonrpc("logsSubscribe", params, doProcessResponse=False) + response = process_messages(self.websocket) + if result is not None: + self.subscription_id = result + logger.info(f"Subscription successful. Subscription id: {self.subscription_id}") + else: + logger.error("Failed to subscribe") + + async def unsubscribe(self): + if self.subscription_id: + result = await self.ws_jsonrpc("logsUnsubscribe", [self.subscription_id]) + if result: + logger.info(f"Unsubscribed from subscription id: {self.subscription_id}") + self.subscription_id = None + else: + logger.error(f"Failed to unsubscribe from subscription id: {self.subscription_id}") + + async def receive_messages(self): + while True: + try: + message = await self.websocket.recv() + await self.message_queue.put(message) + except websockets.exceptions.ConnectionClosedError: + logger.error("WebSocket connection closed") + break + except Exception as e: + logger.error(f"Error receiving message: {e}") + break + + async def process_messages(self): + while True: + message = await self.message_queue.get() + if self.on_message: + await self.on_message(message) + logger.info(f"Received message: {message}") + + async def close(self): + if self.websocket: + await self.websocket.close() + logger.info("WebSocket connection closed") + + async def solana_jsonrpc(method, params=None, jsonParsed=True): + if not isinstance(params, list): + params = [params] if params is not None else [] + + data = { + "jsonrpc": "2.0", + "id": 1, + "method": method, + "params": params + } + + if jsonParsed: + data["params"].append({"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}) + else: + data["params"].append({"maxSupportedTransactionVersion": 0}) + + try: + response = requests.post(SOLANA_HTTP_URL, headers={"Content-Type": "application/json"}, data=json.dumps(data)) + response.raise_for_status() + result = response.json() + if 'result' not in result or 'error' in result: + logger.error("Error fetching data from Solana RPC:", result) + return None + return result['result'] + except Exception as e: + logger.error(f"Error fetching data from Solana RPC: {e}") + return None + +class SolanaAPI: + def __init__(self, process_transaction_callback, on_initial_subscription_callback = None, on_bot_message=None): + self.process_transaction = process_transaction_callback + self.on_initial_subscription = on_initial_subscription_callback + self.on_bot_message = on_bot_message, + + self.dex = SolanaDEX(DISPLAY_CURRENCY) + self.solana_ws = SolanaWS(on_message=self.process_transaction) + + async def process_messages(self, solana_ws): + while True: + message = await solana_ws.message_queue.get() + await self.process_transaction(message) + + async def wallet_watch_loop(self): + + solana_ws = SolanaWS(on_message=self.process_transaction) + first_subscription = True + + while True: + try: + await solana_ws.connect() + await solana_ws.subscribe() + + if first_subscription: + asyncio.create_task(self.on_initial_subscription()) + first_subscription = False + + await self.on_bot_message(f"Solana mainnet connected ({solana_ws.subscription_id})...") + + receive_task = asyncio.create_task(solana_ws.receive_messages()) + process_task = asyncio.create_task(solana_ws.process_messages()) + + try: + await asyncio.gather(receive_task, process_task) + except asyncio.CancelledError: + pass + finally: + receive_task.cancel() + process_task.cancel() + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + finally: + await solana_ws.unsubscribe() + if solana_ws.websocket: + await solana_ws.close() + await self.on_bot_message("Reconnecting...") + await asyncio.sleep(5) + + async def process_transaction(self, signature): + print(f"Processing transaction: {signature['signature']}") + # Add your transaction processing logic here + + async def get_last_transactions(self, account_address, check_interval=300, limit=1000): + last_check_time = None + last_signature = None + + while True: + current_time = datetime.now() + + if last_check_time is None or (current_time - last_check_time).total_seconds() >= check_interval: + params = [ + account_address, + { + "limit": limit + } + ] + + if last_signature: + params[1]["before"] = last_signature + + result = await solana_jsonrpc("getSignaturesForAddress", params) + + if result: + for signature in result: + if last_signature and signature['signature'] == last_signature: + break + + await self.process_transaction(signature) + + if result: + last_signature = result[0]['signature'] + + last_check_time = current_time + + await asyncio.sleep(1) + + async def get_token_metadata_symbol(mint_address): + global TOKENS_INFO + + if mint_address in TOKENS_INFO and 'symbol' in TOKENS_INFO[mint_address]: + return TOKENS_INFO[mint_address].get('symbol') + + try: + account_data_result = await self.solana_ws.solana_jsonrpc("getAccountInfo", mint_address) + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_data = account_data_result['value']['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'decimals' in account_data_info: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address]['decimals'] = account_data_info['decimals'] + else: + TOKENS_INFO[mint_address] = {'decimals': account_data_info['decimals']} + if 'tokenName' in account_data_info: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address]['name'] = account_data_info['tokenName'] + else: + TOKENS_INFO[mint_address] = {'name': account_data_info['tokenName']} + + metadata = await get_token_metadata(mint_address) + if metadata: + if mint_address in TOKENS_INFO: + TOKENS_INFO[mint_address].update(metadata) + else: + TOKENS_INFO[mint_address] = metadata + await save_token_info() + # TOKENS_INFO[mint_address] = metadata + # return metadata.get('symbol') or metadata.get('name') + return TOKENS_INFO[mint_address].get('symbol') + except Exception as e: + logging.error(f"Error fetching token name for {mint_address}: {str(e)}") + return None + + async def get_transaction_details_rpc(tx_signature, readfromDump=False): + global FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES, TOKENS_INFO + try: + if readfromDump and os.path.exists('./logs/transation_details.json'): + with open('./logs/transation_details.json', 'r') as f: # trump_swap_tr_details + transaction_details = json.load(f) + return transaction_details + else: + transaction_details = await solana_jsonrpc("getTransaction", tx_signature) + with open('./logs/transation_details.json', 'w') as f: + json.dump(transaction_details, f, indent=2) + + if transaction_details is None: + logging.error(f"Error fetching transaction details for {tx_signature}") + return None + + # Initialize default result structure + parsed_result = { + "order_id": None, + "token_in": None, + "token_out": None, + "amount_in": 0, + "amount_out": 0, + "amount_in_USD": 0, + "amount_out_USD": 0, + "percentage_swapped": 0 + } + + # Extract order_id from logs + log_messages = transaction_details.get("meta", {}).get("logMessages", []) + for log in log_messages: + if "order_id" in log: + parsed_result["order_id"] = log.split(":")[2].strip() + break + + # Extract token transfers from innerInstructions + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') == 'transferChecked': + info = instruction['parsed']['info'] + mint = info['mint'] + amount = float(info['tokenAmount']['amount']) / 10 ** info['tokenAmount']['decimals'] # Adjust for decimals + + # Determine which token is being swapped in and out based on zero balances + if parsed_result["token_in"] is None and amount > 0: + parsed_result["token_in"] = mint + parsed_result["amount_in"] = amount + + + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + # if we've failed to extract token_in and token_out from the transaction details, try a second method + inner_instructions = transaction_details.get('meta', {}).get('innerInstructions', []) + transfers = [] + + for instruction_set in inner_instructions: + for instruction in instruction_set.get('instructions', []): + if instruction.get('program') == 'spl-token' and instruction.get('parsed', {}).get('type') in ['transfer', 'transferChecked']: + info = instruction['parsed']['info'] + amount = float(info['amount']) if 'amount' in info else float(info['tokenAmount']['amount']) + decimals = info['tokenAmount']['decimals'] if 'tokenAmount' in info else 0 + adjusted_amount = amount / (10 ** decimals) + # adjusted_amount = float(info["amount"]) / (10 ** (info["tokenAmount"]["decimals"] if 'tokenAmount' in info else 0)) + transfers.append({ + 'mint': info.get('mint'), + 'amount': adjusted_amount, + 'source': info['source'], + 'destination': info['destination'] + }) + + # Identify token_in and token_out + if len(transfers) >= 2: + parsed_result["token_in"] = transfers[0]['mint'] + parsed_result["amount_in"] = transfers[0]['amount'] + parsed_result["token_out"] = transfers[-1]['mint'] + parsed_result["amount_out"] = transfers[-1]['amount'] + + # If mint is not provided, query the Solana network for the account data + if parsed_result["token_in"] is None or parsed_result["token_out"] is None: + #for transfer in transfers: + # do only first and last transfer + for transfer in [transfers[0], transfers[-1]]: + if transfer['mint'] is None: + # Query the Solana network for the account data + account_data_result = await solana_jsonrpc("getAccountInfo", transfer['source']) + + if 'value' in account_data_result and 'data' in account_data_result['value']: + account_data_value = account_data_result['value'] + account_data_data = account_data_value['data'] + if 'parsed' in account_data_data and 'info' in account_data_data['parsed']: + account_data_info = account_data_data['parsed']['info'] + if 'mint' in account_data_info: + transfer['mint'] = account_data_info['mint'] + if transfer['mint'] in TOKENS_INFO or 'decimals' not in TOKENS_INFO[transfer['mint']]: + await get_token_metadata_symbol(transfer['mint']) + # get actual prices + current_price = await get_token_prices([transfer['mint']]) + + if parsed_result["token_in"] is None: + parsed_result["token_in"] = transfer['mint'] + parsed_result["symbol_in"] = TOKENS_INFO[transfer['mint']]['symbol'] + parsed_result["amount_in"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] + parsed_result["amount_in_USD"] = parsed_result["amount_in"] * TOKENS_INFO[transfer['mint']].get('price', current_price[transfer['mint']]) + elif parsed_result["token_out"] is None: + parsed_result["token_out"] = transfer['mint'] + parsed_result["symbol_out"] = TOKENS_INFO[transfer['mint']]['symbol'] + parsed_result["amount_out"] = transfer['amount']/10**TOKENS_INFO[transfer['mint']]['decimals'] + parsed_result["amount_out_USD"] = parsed_result["amount_out"] * TOKENS_INFO[transfer['mint']]['price'] + + pre_balalnces = transaction_details.get('meta', {}).get('preTokenBalances', []) + for balance in pre_balalnces: + if balance['mint'] == parsed_result["token_in"] and balance['owner'] == FOLLOWED_WALLET: + parsed_result["before_source_balance"] = float(balance['uiTokenAmount']['amount']) / 10 ** balance['uiTokenAmount']['decimals'] + break + + + # Calculate percentage swapped + try: + if parsed_result["amount_in"] > 0 and 'before_source_balance' in parsed_result and parsed_result["before_source_balance"] > 0: + parsed_result["percentage_swapped"] = (parsed_result["amount_in"] / parsed_result["before_source_balance"]) * 100 + else: + # calculate based on total wallet value: FOLLOWED_WALLET_VALUE + parsed_result["percentage_swapped"] = (parsed_result["amount_in_USD"] / FOLLOWED_WALLET_VALUE) * 100 + except Exception as e: + logging.error(f"Error calculating percentage swapped: {e}") + + return parsed_result + + except requests.exceptions.RequestException as e: + print("Error fetching transaction details:", e) + + + + + +class SolanaDEX: + def __init__(self, DISPLAY_CURRENCY): + self.DISPLAY_CURRENCY = DISPLAY_CURRENCY + pass + + async def get_token_prices(token_addresses: List[str]) -> Dict[str, float]: + global TOKENS_INFO + + # Skip for USD + prices = {addr: 1.0 for addr in token_addresses if addr == "EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v"} + remaining_tokens = [addr for addr in token_addresses if addr not in prices] + + # Try CoinGecko + coingecko_prices = await self.get_prices_from_coingecko(remaining_tokens) + prices.update(coingecko_prices) + + + # For remaining missing tokens, try Jupiter + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + jupiter_prices = await get_prices_from_jupiter(list(missing_tokens)) + prices.update(jupiter_prices) + + + # For tokens not found in CoinGecko, use DexScreener + missing_tokens = set(remaining_tokens) - set(coingecko_prices.keys()) + if missing_tokens: + dexscreener_prices = await get_prices_from_dexscreener(list(missing_tokens)) + prices.update(dexscreener_prices) + + # For remaining missing tokens, try Raydium + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + raydium_prices = await get_prices_from_raydium(list(missing_tokens)) + prices.update(raydium_prices) + + # For remaining missing tokens, try Orca + missing_tokens = set(remaining_tokens) - set(prices.keys()) + if missing_tokens: + orca_prices = await get_prices_from_orca(list(missing_tokens)) + prices.update(orca_prices) + + # If any tokens are still missing, set their prices to 0 + for token in set(token_addresses) - set(prices.keys()): + prices[token] = 0.0 + logging.warning(f"Price not found for token {token}. Setting to 0.") + + for token, price in prices.items(): + token_info = TOKENS_INFO.setdefault(token, {}) + if 'symbol' not in token_info: + token_info['symbol'] = await get_token_metadata_symbol(token) + token_info['price'] = price + + return prices + + async def get_prices_from_coingecko(token_addresses: List[str]) -> Dict[str, float]: + base_url = "https://api.coingecko.com/api/v3/simple/token_price/solana" + prices = {} + + async def fetch_single_price(session, address): + params = { + "contract_addresses": address, + "vs_currencies": DISPLAY_CURRENCY.lower() + } + try: + async with session.get(base_url, params=params) as response: + if response.status == 200: + data = await response.json() + if address in data and DISPLAY_CURRENCY.lower() in data[address]: + return address, data[address][DISPLAY_CURRENCY.lower()] + else: + logging.warning(f"Failed to get price for {address} from CoinGecko. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching price for {address} from CoinGecko: {str(e)}") + return address, None + + async with aiohttp.ClientSession() as session: + tasks = [fetch_single_price(session, address) for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, price in results: + if price is not None: + prices[address] = price + + return prices + + async def get_prices_from_dexscreener(token_addresses: List[str]) -> Dict[str, float]: + base_url = "https://api.dexscreener.com/latest/dex/tokens/" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + tasks = [fetch_token_data(session, f"{base_url}{address}") for address in token_addresses] + results = await asyncio.gather(*tasks) + + for address, result in zip(token_addresses, results): + if result and 'pairs' in result and result['pairs']: + pair = result['pairs'][0] # Use the first pair (usually the most liquid) + prices[address] = float(pair['priceUsd']) + else: + logging.warning(f"No price data found on DexScreener for token {address}") + except Exception as e: + logging.error(f"Error fetching token prices from DexScreener: {str(e)}") + + return prices + + async def get_prices_from_jupiter(token_addresses: List[str]) -> Dict[str, float]: + url = "https://price.jup.ag/v4/price" + params = { + "ids": ",".join(token_addresses) + } + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params) as response: + if response.status == 200: + data = await response.json() + for address, price_info in data.get('data', {}).items(): + if 'price' in price_info: + prices[address] = float(price_info['price']) + else: + logging.error(f"Failed to get token prices from Jupiter. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Jupiter: {str(e)}") + return prices + + # New function for Raydium + async def get_prices_from_raydium(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.raydium.io/v2/main/price" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for address in token_addresses: + if address in data: + prices[address] = float(data[address]) + else: + logging.error(f"Failed to get token prices from Raydium. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Raydium: {str(e)}") + return prices + + # New function for Orca + async def get_prices_from_orca(token_addresses: List[str]) -> Dict[str, float]: + url = "https://api.orca.so/allTokens" + prices = {} + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + data = await response.json() + for token_info in data: + if token_info['mint'] in token_addresses: + prices[token_info['mint']] = float(token_info['price']) + else: + logging.error(f"Failed to get token prices from Orca. Status: {response.status}") + except Exception as e: + logging.error(f"Error fetching token prices from Orca: {str(e)}") + return prices + + async def fetch_token_data(session, url): + try: + async with session.get(url) as response: + if response.status == 200: + return await response.json() + else: + logging.error(f"Failed to fetch data from {url}. Status: {response.status}") + return None + except Exception as e: + logging.error(f"Error fetching data from {url}: {str(e)}") + return None + + async def get_sol_price() -> float: + sol_address = "So11111111111111111111111111111111111111112" # Solana's wrapped SOL address + return await get_token_prices([sol_address]).get(sol_address, 0.0) + + async def get_wallet_balances(wallet_address, doGetTokenName=True): + balances = {} + logging.info(f"Getting balances for wallet: {wallet_address}") + global TOKENS_INFO + try: + response = await solana_client.get_token_accounts_by_owner_json_parsed( + Pubkey.from_string(wallet_address), + opts=TokenAccountOpts( + program_id=Pubkey.from_string("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA") + ), + commitment=Confirmed + ) + + if response.value: + for account in response.value: + try: + parsed_data = account.account.data.parsed + if isinstance(parsed_data, dict) and 'info' in parsed_data: + info = parsed_data['info'] + if isinstance(info, dict) and 'mint' in info and 'tokenAmount' in info: + mint = info['mint'] + decimals = info['tokenAmount']['decimals'] + amount = float(info['tokenAmount']['amount'])/10**decimals + if amount > 0: + if mint in TOKENS_INFO: + token_name = TOKENS_INFO[mint].get('symbol') + elif doGetTokenName: + token_name = await get_token_metadata_symbol(mint) or 'N/A' + # sleep for 1 second to avoid rate limiting + await asyncio.sleep(2) + + TOKENS_INFO[mint]['holdedAmount'] = round(amount,decimals) + TOKENS_INFO[mint]['decimals'] = decimals + balances[mint] = { + 'name': token_name or 'N/A', + 'address': mint, + 'amount': amount, + 'decimals': decimals + } + # sleep for 1 second to avoid rate limiting + logging.debug(f"Account balance for {token_name} ({mint}): {amount}") + else: + logging.warning(f"Unexpected data format for account: {account}") + except Exception as e: + logging.error(f"Error parsing account data: {str(e)}") + + sol_balance = await solana_client.get_balance(Pubkey.from_string(wallet_address)) + if sol_balance.value is not None: + balances['SOL'] = { + 'name': 'SOL', + 'address': 'SOL', + 'amount': sol_balance.value / 1e9 + } + else: + logging.warning(f"SOL balance response missing for wallet: {wallet_address}") + + except Exception as e: + logging.error(f"Error getting wallet balances: {str(e)}") + logging.info(f"Found {len(response.value)} ({len(balances)} non zero) token accounts for wallet: {wallet_address}") + return balances + + async def convert_balances_to_currency(balances , sol_price): + converted_balances = {} + for address, info in balances.items(): + converted_balance = info.copy() # Create a copy of the original info + if info['name'] == 'SOL': + converted_balance['value'] = info['amount'] * sol_price + elif address in TOKEN_PRICES: + converted_balance['value'] = info['amount'] * TOKEN_PRICES[address] + else: + converted_balance['value'] = None # Price not available + logging.warning(f"Price not available for token {info['name']} ({address})") + converted_balances[address] = converted_balance + return converted_balances + + + async def list_initial_wallet_states(): + global TOKEN_ADDRESSES, FOLLOWED_WALLET_VALUE, YOUR_WALLET_VALUE, TOKEN_PRICES + global TOKENS_INFO # new + + followed_wallet_balances = await get_wallet_balances(FOLLOWED_WALLET) + your_wallet_balances = await get_wallet_balances(YOUR_WALLET) + + all_token_addresses = list(set(followed_wallet_balances.keys()) | + set(your_wallet_balances.keys()) | + set(TOKEN_ADDRESSES.values())) + + TOKEN_PRICES = await get_token_prices(all_token_addresses) + sol_price = await get_sol_price() + + followed_converted_balances = await convert_balances_to_currency(followed_wallet_balances, sol_price) + your_converted_balances = await convert_balances_to_currency(your_wallet_balances, sol_price) + + + TOKEN_ADDRESSES = { + address: info for address, + info in {**followed_converted_balances, **your_converted_balances}.items() if info['value'] is not None and info['value'] > 0 + } + logging.info(f"Monitoring balances for tokens: {[info['name'] for info in TOKEN_ADDRESSES.values()]}") + + followed_wallet_state = [] + FOLLOWED_WALLET_VALUE = 0 + for address, info in followed_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + followed_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY} ({info['address']})") + FOLLOWED_WALLET_VALUE += info['value'] + + your_wallet_state = [] + YOUR_WALLET_VALUE = 0 + for address, info in your_converted_balances.items(): + if info['value'] is not None and info['value'] > 0: + your_wallet_state.append(f"{info['name']}: {info['value']:.2f} {DISPLAY_CURRENCY}") + YOUR_WALLET_VALUE += info['value'] + + message = ( + f"Initial Wallet States (All balances in {DISPLAY_CURRENCY}):\n\n" + f"Followed Wallet ({FOLLOWED_WALLET}):\n" + f"{chr(10).join(followed_wallet_state)}\n" + f"Total Value: {FOLLOWED_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Your Wallet ({YOUR_WALLET}):\n" + f"{chr(10).join(your_wallet_state)}\n" + f"Total Value: {YOUR_WALLET_VALUE:.2f} {DISPLAY_CURRENCY}\n\n" + f"Monitored Tokens:\n" + f"{', '.join([safe_get_property(info, 'name') for info in TOKEN_ADDRESSES.values()])}" + ) + + logging.info(message) + await telegram_utils.send_telegram_message(message) + + # save token info to file + await save_token_info() + + + +#example +# async def main(): +# await telegram_utils.initialize() + +# async def process_log(log): +# print(f"Processing log: {log}") + +# async def list_initial_wallet_states(): +# print("Listing initial wallet states") + + +# wallet_watch_task = asyncio.create_task(solana_api.wallet_watch_loop()) + +# try: +# await asyncio.gather(wallet_watch_task) +# except asyncio.CancelledError: +# pass +# finally: +# await telegram_utils.close() + +# if __name__ == "__main__": +# asyncio.run(main()) \ No newline at end of file diff --git a/crypto/sol/modules/__init__.py b/crypto/sol/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/crypto/sol/modules/storage.py b/crypto/sol/modules/storage.py new file mode 100644 index 0000000..26ea7e2 --- /dev/null +++ b/crypto/sol/modules/storage.py @@ -0,0 +1,328 @@ +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import aiosqlite +import json +from datetime import datetime + +DATABASE_FILE = "./app_data.db" + +async def init_db(): + async with aiosqlite.connect(DATABASE_FILE) as db: + await db.executescript(""" + CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT UNIQUE NOT NULL, + password_hash TEXT NOT NULL, + email TEXT UNIQUE NOT NULL, + api_key TEXT UNIQUE, + plan TEXT DEFAULT 'free' + ); + + CREATE TABLE IF NOT EXISTS wallets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER, + address TEXT NOT NULL, + name TEXT, + FOREIGN KEY (user_id) REFERENCES users(id) + ); + + CREATE TABLE IF NOT EXISTS transactions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + wallet_id INTEGER, + timestamp TEXT, + type TEXT, + sell_currency TEXT, + sell_amount REAL, + sell_value REAL, + buy_currency TEXT, + buy_amount REAL, + buy_value REAL, + closed BOOLEAN DEFAULT 0, + details TEXT, + solana_signature TEXT UNIQUE, + FOREIGN KEY (wallet_id) REFERENCES wallets(id) + ); + + CREATE TABLE IF NOT EXISTS holdings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + wallet_id INTEGER, + currency TEXT, + amount REAL, + last_updated TEXT, + FOREIGN KEY (wallet_id) REFERENCES wallets(id) + ); + + CREATE TABLE IF NOT EXISTS price_alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER, + currency TEXT, + target_price REAL, + alert_type TEXT, + FOREIGN KEY (user_id) REFERENCES users(id) + ); + + CREATE TABLE IF NOT EXISTS followed_accounts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER, + address TEXT, + followed_address TEXT, + name TEXT, + FOREIGN KEY (address) REFERENCES wallets(address), + FOREIGN KEY (followed_address) REFERENCES wallets(address), + FOREIGN KEY (user_id) REFERENCES users(id) + ); + """) + await db.commit() + +async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details=None): + async with aiosqlite.connect(DATABASE_FILE) as db: + await db.execute(""" + INSERT INTO transactions (wallet_id, timestamp, type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (wallet_id, datetime.now().isoformat(), transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, json.dumps(details or {}))) + await db.commit() + + +async def update_holdings(wallet_id, currency, amount_change): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute("SELECT amount FROM holdings WHERE wallet_id = ? AND currency = ?", (wallet_id, currency)) + result = await cursor.fetchone() + if result: + new_amount = result[0] + amount_change + await db.execute("UPDATE holdings SET amount = ?, last_updated = ? WHERE wallet_id = ? AND currency = ?", + (new_amount, datetime.now().isoformat(), wallet_id, currency)) + else: + await db.execute("INSERT INTO holdings (wallet_id, currency, amount, last_updated) VALUES (?, ?, ?, ?)", + (wallet_id, currency, amount_change, datetime.now().isoformat())) + await db.commit() + +async def get_wallet_holdings(wallet_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute("SELECT currency, amount FROM holdings WHERE wallet_id = ?", (wallet_id,)) + return await cursor.fetchall() + +async def get_transaction_history(wallet_id, start_date=None, end_date=None, include_closed=False): + async with aiosqlite.connect(DATABASE_FILE) as db: + query = "SELECT * FROM transactions WHERE wallet_id = ?" + params = [wallet_id] + if not include_closed: + query += " AND closed = 0" + if start_date: + query += " AND timestamp >= ?" + params.append(start_date) + if end_date: + query += " AND timestamp <= ?" + params.append(end_date) + query += " ORDER BY timestamp DESC" + cursor = await db.execute(query, params) + return await cursor.fetchall() + +# New utility functions + +async def close_transaction(transaction_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + await db.execute("UPDATE transactions SET closed = 1 WHERE id = ?", (transaction_id,)) + await db.commit() + +async def get_open_transactions(wallet_id, currency): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT * FROM transactions + WHERE wallet_id = ? AND buy_currency = ? AND closed = 0 + ORDER BY timestamp ASC + """, (wallet_id, currency)) + return await cursor.fetchall() + +async def calculate_current_holdings(wallet_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT + buy_currency AS currency, + SUM(buy_amount) - COALESCE( + (SELECT SUM(sell_amount) + FROM transactions t2 + WHERE t2.wallet_id = t1.wallet_id + AND t2.sell_currency = t1.buy_currency + AND t2.closed = 0), + 0 + ) AS amount + FROM transactions t1 + WHERE wallet_id = ? AND closed = 0 + GROUP BY buy_currency + HAVING amount > 0 + """, (wallet_id,)) + return await cursor.fetchall() + +STABLECOINS = ['USDC', 'USDT', 'SOL'] + +async def is_transaction_closed(wallet_id, transaction_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT t1.buy_currency, t1.buy_amount, + (SELECT SUM(sell_amount) + FROM transactions t2 + WHERE t2.wallet_id = t1.wallet_id + AND t2.sell_currency = t1.buy_currency + AND t2.timestamp > t1.timestamp) AS sold_amount + FROM transactions t1 + WHERE t1.id = ? AND t1.wallet_id = ? + """, (transaction_id, wallet_id)) + result = await cursor.fetchone() + + if result: + buy_currency, buy_amount, sold_amount = result + return sold_amount is not None and sold_amount >= buy_amount + return False + +async def close_completed_transactions(wallet_id): + async with aiosqlite.connect(DATABASE_FILE) as db: + cursor = await db.execute(""" + SELECT id FROM transactions + WHERE wallet_id = ? AND closed = 0 AND buy_currency NOT IN (?) + """, (wallet_id, ','.join(STABLECOINS))) + transactions = await cursor.fetchall() + + for (transaction_id,) in transactions: + if await is_transaction_closed(wallet_id, transaction_id): + await close_transaction(transaction_id) + +async def get_profit_loss(wallet_id, currency, start_date=None, end_date=None): + async with aiosqlite.connect(DATABASE_FILE) as db: + query = """ + SELECT + SUM(CASE WHEN sell_currency = ? THEN sell_value ELSE -buy_value END) as profit_loss + FROM transactions + WHERE wallet_id = ? AND (sell_currency = ? OR buy_currency = ?) + """ + params = [currency, wallet_id, currency, currency] + + if start_date: + query += " AND timestamp >= ?" + params.append(start_date) + if end_date: + query += " AND timestamp <= ?" + params.append(end_date) + + cursor = await db.execute(query, params) + result = await cursor.fetchone() + return result[0] if result else 0 + +# # # # # # USERS + +# For this example, we'll use a simple dictionary to store users +users = { + "db": {"id": 1, "username": "db", "email": "user1@example.com", "password": "db"}, + "popov": {"id": 2, "username": "popov", "email": "user2@example.com", "password": "popov"} +} + +def get_or_create_user(email, google_id): + user = next((u for u in users.values() if u['email'] == email), None) + if not user: + user_id = max(u['id'] for u in users.values()) + 1 + username = email.split('@')[0] # Use the part before @ as username + user = { + 'id': user_id, + 'username': username, + 'email': email, + 'google_id': google_id + } + users[username] = user + return user + +def authenticate_user(username, password): + """ + Authenticate a user based on username and password. + Returns user data if authentication is successful, None otherwise. + """ + user = users.get(username) + if user and user['password'] == password: + return {"id": user['id'], "username": user['username'], "email": user['email']} + return None + +def get_user_by_id(user_id): + """ + Retrieve a user by their ID. + """ + for user in users.values(): + if user['id'] == int(user_id): + return {"id": user['id'], "username": user['username'], "email": user['email']} + return None + +def store_api_key(user_id, api_key): + """ + Store the generated API key for a user. + """ + # In a real application, you would store this in a database + # For this example, we'll just print it + print(f"Storing API key {api_key} for user {user_id}") + + + + + +# async def get_new_transactions(wallet_address, rpc_url): +# async with AsyncClient(rpc_url) as client: +# last_tx = await get_last_stored_transaction(wallet_address) + +# if last_tx: +# last_signature, last_timestamp = last_tx +# else: +# # If no transactions are stored, we'll fetch all transactions +# last_signature = None +# last_timestamp = None + +# new_transactions = [] + +# # Get the transaction history for the wallet +# tx_history = await client.get_signatures_for_address(wallet_address, before=last_signature) + +# for tx in tx_history.value: +# # Check if the transaction is newer than the last stored one +# if not last_timestamp or tx.block_time > datetime.fromisoformat(last_timestamp).timestamp(): +# # Fetch the full transaction details +# tx_details = await client.get_transaction(tx.signature, commitment=Confirmed) +# new_transactions.append(tx_details) + +# return new_transactions + +# async def process_new_transactions(wallet_id, wallet_address, rpc_url): +# new_transactions = await get_new_transactions(wallet_address, rpc_url) + +# for tx in new_transactions: +# # Process the transaction and extract relevant information +# # This is a placeholder - you'll need to implement the actual logic based on your requirements +# transaction_type = "swap" # Determine the type based on the transaction data +# sell_currency = "SOL" # Extract from transaction data +# sell_amount = 1.0 # Extract from transaction data +# sell_value = 100.0 # Extract from transaction data +# buy_currency = "USDC" # Extract from transaction data +# buy_amount = 100.0 # Extract from transaction data +# buy_value = 100.0 # Extract from transaction data +# solana_signature = tx.transaction.signatures[0] + +# # Store the transaction in the database +# await store_transaction( +# wallet_id, transaction_type, sell_currency, sell_amount, sell_value, +# buy_currency, buy_amount, buy_value, solana_signature +# ) + +# # Update holdings +# await update_holdings(wallet_id, sell_currency, -sell_amount) +# await update_holdings(wallet_id, buy_currency, buy_amount) + +# # After processing all new transactions, close completed transactions +# await close_completed_transactions(wallet_id) + + + +# Example usage +if __name__ == "__main__": + import asyncio + + async def main(): + await init_db() + # Add more test functions here + + asyncio.run(main()) \ No newline at end of file diff --git a/crypto/sol/modules/utils.py b/crypto/sol/modules/utils.py new file mode 100644 index 0000000..6ed7ab7 --- /dev/null +++ b/crypto/sol/modules/utils.py @@ -0,0 +1,43 @@ +# telegram_utils.py +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import aiohttp +import logging +from telegram import Bot +from telegram.constants import ParseMode +from config import TELEGRAM_BOT_TOKEN, DEVELOPER_CHAT_ID, BOT_NAME + +class TelegramUtils: + def __init__(self): + self.bot = None + self.conn_pool = None + self.timeout = None + + async def initialize(self): + # Create a custom connection pool + self.conn_pool = aiohttp.TCPConnector(limit=100) # Increase the connection limit + self.timeout = aiohttp.ClientTimeout(total=30) # Set a longer timeout + + # Initialize Telegram Bot + self.bot = Bot(token=TELEGRAM_BOT_TOKEN) + + async def send_telegram_message(self, message): + if not self.bot: + await self.initialize() + + try: + # await self.bot.send_message(chat_id=DEVELOPER_CHAT_ID, text=f"[{BOT_NAME}] {message}", parse_mode=ParseMode.HTML) + logging.info(f"Telegram message sent: {message}") + except Exception as e: + logging.error(f"Error sending Telegram message: {str(e)}") + + async def close(self): + if self.conn_pool: + await self.conn_pool.close() + +# Create a global instance of TelegramUtils +telegram_utils = TelegramUtils() + +# You can add more Telegram-related methods to the TelegramUtils class if needed \ No newline at end of file diff --git a/crypto/sol/modules/webui.py b/crypto/sol/modules/webui.py new file mode 100644 index 0000000..4dee698 --- /dev/null +++ b/crypto/sol/modules/webui.py @@ -0,0 +1,112 @@ +from flask import Flask, jsonify, request, render_template, redirect, url_for +# from flask_oauthlib.client import OAuth +from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user +import secrets +from modules import storage +import os + +def init_app(): + app = Flask(__name__, template_folder='../templates', static_folder='../static') + app.config['SECRET_KEY'] = 'your-secret-key' + login_manager = LoginManager(app) + login_manager.login_view = 'login' + + # oauth = OAuth(app) + # google = oauth.remote_app( + # 'google', + # consumer_key='YOUR_GOOGLE_CLIENT_ID', + # consumer_secret='YOUR_GOOGLE_CLIENT_SECRET', + # request_token_params={ + # 'scope': 'email' + # }, + # base_url='https://www.googleapis.com/oauth2/v1/', + # request_token_url=None, + # access_token_method='POST', + # access_token_url='https://accounts.google.com/o/oauth2/token', + # authorize_url='https://accounts.google.com/o/oauth2/auth', + # ) + + + login_manager = LoginManager() + login_manager.init_app(app) + + @app.route('/login/google/authorized') + def authorized(): + # resp = google.authorized_response() + # if resp is None or resp.get('access_token') is None: + # return 'Access denied: reason={} error={}'.format( + # request.args['error_reason'], + # request.args['error_description'] + # ) + # session['google_token'] = (resp['access_token'], '') + # user_info = google.get('userinfo') + # user = storage.get_or_create_user(user_info.data['email'], user_info.data['id']) + # login_user(user) + return redirect(url_for('index')) + + + class User(UserMixin): + def __init__(self, id, username, email): + self.id = id + self.username = username + self.email = email + + @login_manager.user_loader + def load_user(user_id): + user_data = storage.get_user_by_id(user_id) + if user_data: + return User(id=user_data['id'], username=user_data['username'], email=user_data['email']) + return None + + @app.route('/') + def index(): + return render_template('index.html') + + @app.route('/login', methods=['GET', 'POST']) + def login(): + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + user = storage.authenticate_user(username, password) + if user: + login_user(User(id=user['id'], username=user['username'], email=user['email'])) + return redirect(url_for('dashboard')) + else: + return render_template('login.html', error='Invalid credentials') + elif request.args.get('google'): + return google.authorize(callback=url_for('authorized', _external=True)) + return render_template('login.html') + + @app.route('/logout') + @login_required + def logout(): + logout_user() + return redirect(url_for('index')) + + @app.route('/dashboard') + @login_required + def dashboard(): + return render_template('dashboard.html') + + @app.route('/generate_api_key', methods=['POST']) + @login_required + def generate_api_key(): + api_key = secrets.token_urlsafe(32) + storage.store_api_key(current_user.id, api_key) + return jsonify({'api_key': api_key}) + + @app.route('/wallet//transactions', methods=['GET']) + @login_required + def get_transactions(wallet_id): + transactions = storage.get_transactions(wallet_id) + return jsonify(transactions) + + @app.route('/wallet//holdings', methods=['GET']) + @login_required + def get_holdings(wallet_id): + holdings = storage.get_holdings(wallet_id) + return jsonify(holdings) + +# Implement other routes for reports, price alerts, following accounts, etc. + + return app \ No newline at end of file diff --git a/crypto/sol/requirements.in b/crypto/sol/requirements.in deleted file mode 100644 index 284d70d..0000000 --- a/crypto/sol/requirements.in +++ /dev/null @@ -1,12 +0,0 @@ -aiohttp==3.10.9 -aiohttp==3.10.5 -base58==2.1.1 -dexscreener==1.1 -Flask==3.0.3 -jupiter_python_sdk==0.0.2.0 -python-dotenv==1.0.1 -python-telegram-bot==21.6 -Requests==2.32.3 -solana==0.34.3 -solders==0.21.0 -websockets==10.4 diff --git a/crypto/sol/requirements.txt b/crypto/sol/requirements.txt index afc1b11..7d2e9ed 100644 --- a/crypto/sol/requirements.txt +++ b/crypto/sol/requirements.txt @@ -1,7 +1,10 @@ aiohttp==3.10.9 +aiosqlite base58==2.1.1 dexscreener==1.1 Flask==3.0.3 +flask-login +flask-oauthlib jupiter_python_sdk==0.0.2.0 python-dotenv==1.0.1 python-telegram-bot==21.6 diff --git a/crypto/sol/static/app.js b/crypto/sol/static/app.js deleted file mode 100644 index 32143e6..0000000 --- a/crypto/sol/static/app.js +++ /dev/null @@ -1,28 +0,0 @@ -document.getElementById('connectWallet').addEventListener('click', async () => { - try { - const { solana } is window; - if (solana && solana.isPhantom) { - const response = await solana.connect({ onlyIfTrusted: true }); - console.log('Connected with Public Key:', response.publicKey.toString()); - } else { - alert('Phantom wallet not found. Please install it.'); - } - } catch (error) { - console.error(error); - alert('Connection to Phantom Wallet failed'); - } -}); - -document.getElementById('swapToken').addEventListener('click', () => { - const tokenName = document.getElementById('tokenName').value; - const amount = document.getElementById('amount').value; - fetch('/swap', { - method: 'POST', - headers: { - 'Content-Type': 'application/json' - }, - body: JSON.stringify({token_name: tokenName, amount: amount}) - }) - .then(response => response.json()) - .then(data => alert(data.message)); -}); diff --git a/crypto/sol/static/css/styles.css b/crypto/sol/static/css/styles.css new file mode 100644 index 0000000..d2db183 --- /dev/null +++ b/crypto/sol/static/css/styles.css @@ -0,0 +1,46 @@ +/* Add your custom styles here */ +body { + font-family: Arial, sans-serif; + line-height: 1.6; + margin: 0; + padding: 0; +} + +header { + background-color: #4A90E2; + color: white; + padding: 1rem; +} + +nav ul { + list-style-type: none; + padding: 0; +} + +nav ul li { + display: inline; + margin-right: 1rem; +} + +nav ul li a { + color: white; + text-decoration: none; +} + +main { + padding: 2rem; +} + +footer { + background-color: #333; + color: white; + text-align: center; + padding: 1rem; + position: fixed; + bottom: 0; + width: 100%; +} + +@media (max-width: 768px) { + /* Add responsive styles for mobile devices */ +} \ No newline at end of file diff --git a/crypto/sol/static/images/logo.png b/crypto/sol/static/images/logo.png new file mode 100644 index 0000000..e69de29 diff --git a/crypto/sol/static/js/app.js b/crypto/sol/static/js/app.js new file mode 100644 index 0000000..80885c5 --- /dev/null +++ b/crypto/sol/static/js/app.js @@ -0,0 +1,49 @@ +document.addEventListener('DOMContentLoaded', () => { + const connectWalletButton = document.getElementById('connectWallet'); + const swapTokenButton = document.getElementById('swapToken'); + const generateApiKeyButton = document.getElementById('generate-api-key'); + const apiKeyDisplay = document.getElementById('api-key-display'); + + if (connectWalletButton) { + connectWalletButton.addEventListener('click', async () => { + try { + const { solana } = window; + if (solana && solana.isPhantom) { + const response = await solana.connect({ onlyIfTrusted: true }); + console.log('Connected with Public Key:', response.publicKey.toString()); + } else { + alert('Phantom wallet not found. Please install it.'); + } + } catch (error) { + console.error(error); + alert('Connection to Phantom Wallet failed'); + } + }); + } + + if (swapTokenButton) { + swapTokenButton.addEventListener('click', () => { + const tokenName = document.getElementById('tokenName').value; + const amount = document.getElementById('amount').value; + fetch('/swap', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({token_name: tokenName, amount: amount}) + }) + .then(response => response.json()) + .then(data => alert(data.message)); + }); + } + + if (generateApiKeyButton) { + generateApiKeyButton.addEventListener('click', async () => { + const response = await fetch('/generate_api_key', { method: 'POST' }); + const data = await response.json(); + apiKeyDisplay.textContent = `Your API Key: ${data.api_key}`; + }); + } + + // Add more JavaScript for fetching and displaying wallet data, transactions, and holdings +}); \ No newline at end of file diff --git a/crypto/sol/static/manifest.json b/crypto/sol/static/manifest.json new file mode 100644 index 0000000..4ea8f8c --- /dev/null +++ b/crypto/sol/static/manifest.json @@ -0,0 +1,20 @@ +{ + "name": "Crypto Portfolio Tracker", + "short_name": "CryptoTracker", + "start_url": "/", + "display": "standalone", + "background_color": "#ffffff", + "theme_color": "#4A90E2", + "icons": [ + { + "src": "/static/images/logo-192x192.png", + "sizes": "192x192", + "type": "image/png" + }, + { + "src": "/static/images/logo-512x512.png", + "sizes": "512x512", + "type": "image/png" + } + ] + } \ No newline at end of file diff --git a/crypto/sol/static/service-worker.js b/crypto/sol/static/service-worker.js new file mode 100644 index 0000000..0559efc --- /dev/null +++ b/crypto/sol/static/service-worker.js @@ -0,0 +1,8 @@ +// Add service worker code for offline functionality and caching +self.addEventListener('install', (event) => { + // Perform install steps + }); + + self.addEventListener('fetch', (event) => { + // Handle fetch events + }); \ No newline at end of file diff --git a/crypto/sol/templates/base.html b/crypto/sol/templates/base.html new file mode 100644 index 0000000..cdd2ba2 --- /dev/null +++ b/crypto/sol/templates/base.html @@ -0,0 +1,36 @@ + + + + + + {% block title %}Crypto Portfolio Tracker{% endblock %} + + + + + +
+ +
+ +
+ {% block content %}{% endblock %} +
+ +
+

© 2023 Crypto Portfolio Tracker

+
+ + + + \ No newline at end of file diff --git a/crypto/sol/templates/dashboard.html b/crypto/sol/templates/dashboard.html new file mode 100644 index 0000000..7733d1f --- /dev/null +++ b/crypto/sol/templates/dashboard.html @@ -0,0 +1,23 @@ +{% extends "base.html" %} + +{% block content %} +

Dashboard

+

Welcome, {{ current_user.username }}!

+ +

Your Wallets

+
+ +

Recent Transactions

+
+ +

Holdings

+
+ + +

+ + +{% endblock %} \ No newline at end of file diff --git a/crypto/sol/templates/index.html b/crypto/sol/templates/index.html index 0de89fe..5ebd64a 100644 --- a/crypto/sol/templates/index.html +++ b/crypto/sol/templates/index.html @@ -1,21 +1,6 @@ - - - - - - Token Swapper - - -

Token Swapper

-
- -
-
- - - -
- - - - +{% extends "base.html" %} + +{% block content %} +

Welcome to Crypto Portfolio Tracker

+

Track your cryptocurrency investments with ease.

+{% endblock %} \ No newline at end of file diff --git a/crypto/sol/templates/login.html b/crypto/sol/templates/login.html new file mode 100644 index 0000000..49d940c --- /dev/null +++ b/crypto/sol/templates/login.html @@ -0,0 +1,18 @@ +{% extends "base.html" %} + +{% block content %} +

Login

+{% if error %} +

{{ error }}

+{% endif %} +
+ +

+ +

+ +
+
+Login with Google + +{% endblock %} \ No newline at end of file diff --git a/crypto/sol/templates/swap.html b/crypto/sol/templates/swap.html new file mode 100644 index 0000000..0de89fe --- /dev/null +++ b/crypto/sol/templates/swap.html @@ -0,0 +1,21 @@ + + + + + + Token Swapper + + +

Token Swapper

+
+ +
+
+ + + +
+ + + +