from typing import NamedTuple from enum import Enum from datetime import datetime import json from prisma import Prisma from typing import List, Optional, Dict import asyncio class Transaction(NamedTuple): wallet: str transaction_type: str symbol_in: str amount_in: float value_in_usd: float symbol_out: str amount_out: float value_out_usd: float tx_signature: str class TransactionType(Enum): BUY = "BUY" SELL = "SELL" class TransactionStatus(Enum): PENDING = "PENDING" SENT = "SENT" CONFIRMED = "CONFIRMED" class Storage: _instance: Optional['Storage'] = None _lock = asyncio.Lock() _initialized = False prisma = Prisma() # Class-level Prisma instance STABLECOINS = ['USDC', 'USDT', 'SOL'] def __new__(cls): if cls._instance is None: cls._instance = super(Storage, cls).__new__(cls) return cls._instance async def __ainit__(self): if self._initialized: return await self.prisma.connect() self._initialized = True @classmethod async def get_instance(cls) -> 'Storage': if not cls._instance: async with cls._lock: if not cls._instance: cls._instance = cls() await cls._instance.__ainit__() return cls._instance @classmethod def get_prisma(cls): return cls.prisma async def disconnect(self): if self._initialized: await self.prisma.disconnect() self._initialized = False def __init__(self): self.prisma = Prisma() self.users = { "db": {"id": 1, "username": "db", "email": "user1@example.com", "password": "db"}, "popov": {"id": 2, "username": "popov", "email": "user2@example.com", "password": "popov"} } def is_connected(self): return self.prisma.is_connected() async def connect(self): await self.prisma.connect() async def disconnect(self): await self.prisma.disconnect() async def store_transaction(self, transaction: Transaction): return await self.prisma.transaction.create( data={ 'wallet_id': transaction.wallet, 'timestamp': datetime.now().isoformat(), 'type': transaction.transaction_type, 'sell_currency': transaction.symbol_in, 'sell_amount': transaction.amount_in, 'sell_value': transaction.value_in_usd, 'buy_currency': transaction.symbol_out, 'buy_amount': transaction.amount_out, 'buy_value': transaction.value_out_usd, 'solana_signature': transaction.tx_signature, 'details': json.dumps({}), 'status': TransactionStatus.PENDING.value } ) async def update_holdings(self, wallet_id: str, currency: str, amount_change: float): holding = await self.prisma.holding.find_first( where={ 'wallet_id': wallet_id, 'currency': currency } ) if holding: new_amount = holding.amount + amount_change return await self.prisma.holding.update( where={'id': holding.id}, data={ 'amount': new_amount, 'last_updated': datetime.now().isoformat() } ) else: return await self.prisma.holding.create( data={ 'wallet_id': wallet_id, 'currency': currency, 'amount': amount_change, 'last_updated': datetime.now().isoformat() } ) async def get_wallet_holdings(self, wallet_id: str): return await self.prisma.holding.find_many( where={'wallet_id': wallet_id}, select={'currency': True, 'amount': True} ) async def get_transaction_history(self, wallet_id: str, start_date: Optional[str] = None, end_date: Optional[str] = None, include_closed: bool = False): filters = {'wallet_id': wallet_id} if not include_closed: filters['closed'] = False if start_date: filters['timestamp'] = {'gte': start_date} if end_date: filters['timestamp'] = {'lte': end_date} return await self.prisma.transaction.find_many( where=filters, order={'timestamp': 'desc'} ) async def close_transaction(self, transaction_id: int): return await self.prisma.transaction.update( where={'id': transaction_id}, data={'closed': True} ) async def get_open_transactions(self, wallet_id: str, currency: str): return await self.prisma.transaction.find_many( where={ 'wallet_id': wallet_id, 'buy_currency': currency, 'closed': False }, order={'timestamp': 'asc'} ) async def calculate_current_holdings(self, wallet_id: str): transactions = await self.prisma.transaction.group_by( by=['buy_currency'], where={'wallet_id': wallet_id, 'closed': False}, _sum={'buy_amount': True, 'sell_amount': True} ) return [ { 'currency': t.buy_currency, 'amount': t._sum.buy_amount - (t._sum.sell_amount or 0) } for t in transactions if t._sum.buy_amount > (t._sum.sell_amount or 0) ] async def is_transaction_closed(self, wallet_id: str, transaction_id: int) -> bool: transaction = await self.prisma.transaction.find_unique( where={'id': transaction_id} ) if transaction: sold_amount = await self.prisma.transaction.aggregate( _sum={'sell_amount': True}, where={ 'wallet_id': wallet_id, 'sell_currency': transaction.buy_currency, 'timestamp': {'gt': transaction.timestamp} } ) return sold_amount._sum.sell_amount >= transaction.buy_amount return False async def close_completed_transactions(self, wallet_id: str): transactions = await self.prisma.transaction.find_many( where={ 'wallet_id': wallet_id, 'closed': False, 'buy_currency': {'notIn': self.STABLECOINS} } ) for transaction in transactions: if await self.is_transaction_closed(wallet_id, transaction.id): await self.close_transaction(transaction.id) async def get_profit_loss(self, wallet_id: str, currency: str, start_date: Optional[str] = None, end_date: Optional[str] = None): filters = { 'wallet_id': wallet_id, 'OR': [ {'sell_currency': currency}, {'buy_currency': currency} ] } if start_date: filters['timestamp'] = {'gte': start_date} if end_date: filters['timestamp'] = {'lte': end_date} result = await self.prisma.transaction.aggregate( _sum={ 'sell_value': True, 'buy_value': True }, where=filters ) return (result._sum.sell_value or 0) - (result._sum.buy_value or 0) # User management methods def get_or_create_user(self, email: str, google_id: str): user = next((u for u in self.users.values() if u['email'] == email), None) if not user: user_id = max(u['id'] for u in self.users.values()) + 1 username = email.split('@')[0] user = { 'id': user_id, 'username': username, 'email': email, 'google_id': google_id } self.users[username] = user return user def authenticate_user(self, username: str, password: str): user = self.users.get(username) if user and user['password'] == password: return {"id": user['id'], "username": user['username'], "email": user['email']} return None def get_user_by_id(self, user_id: int): for user in self.users.values(): if user['id'] == int(user_id): return {"id": user['id'], "username": user['username'], "email": user['email']} return None def store_api_key(self, user_id: int, api_key: str): print(f"Storing API key {api_key} for user {user_id}") storage = Storage()