259 lines
8.6 KiB
Python
259 lines
8.6 KiB
Python
from typing import NamedTuple
|
|
from enum import Enum
|
|
from datetime import datetime
|
|
import json
|
|
from prisma import Prisma
|
|
from typing import List, Optional, Dict
|
|
import asyncio
|
|
|
|
class Transaction(NamedTuple):
|
|
wallet: str
|
|
transaction_type: str
|
|
symbol_in: str
|
|
amount_in: float
|
|
value_in_usd: float
|
|
symbol_out: str
|
|
amount_out: float
|
|
value_out_usd: float
|
|
tx_signature: str
|
|
|
|
class TransactionType(Enum):
|
|
BUY = "BUY"
|
|
SELL = "SELL"
|
|
class TransactionStatus(Enum):
|
|
PENDING = "PENDING"
|
|
SENT = "SENT"
|
|
CONFIRMED = "CONFIRMED"
|
|
|
|
|
|
class Storage:
|
|
_instance: Optional['Storage'] = None
|
|
_lock = asyncio.Lock()
|
|
_initialized = False
|
|
prisma = Prisma() # Class-level Prisma instance
|
|
|
|
STABLECOINS = ['USDC', 'USDT', 'SOL']
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super(Storage, cls).__new__(cls)
|
|
return cls._instance
|
|
|
|
async def __ainit__(self):
|
|
if self._initialized:
|
|
return
|
|
|
|
await self.prisma.connect()
|
|
self._initialized = True
|
|
|
|
@classmethod
|
|
async def get_instance(cls) -> 'Storage':
|
|
if not cls._instance:
|
|
async with cls._lock:
|
|
if not cls._instance:
|
|
cls._instance = cls()
|
|
await cls._instance.__ainit__()
|
|
return cls._instance
|
|
@classmethod
|
|
def get_prisma(cls):
|
|
return cls.prisma
|
|
async def disconnect(self):
|
|
if self._initialized:
|
|
await self.prisma.disconnect()
|
|
self._initialized = False
|
|
|
|
def __init__(self):
|
|
self.prisma = Prisma()
|
|
self.users = {
|
|
"db": {"id": 1, "username": "db", "email": "user1@example.com", "password": "db"},
|
|
"popov": {"id": 2, "username": "popov", "email": "user2@example.com", "password": "popov"}
|
|
}
|
|
|
|
def is_connected(self):
|
|
return self.prisma.is_connected()
|
|
|
|
async def connect(self):
|
|
await self.prisma.connect()
|
|
|
|
async def disconnect(self):
|
|
await self.prisma.disconnect()
|
|
|
|
async def store_transaction(self, transaction: Transaction):
|
|
return await self.prisma.transaction.create(
|
|
data={
|
|
'wallet_id': transaction.wallet,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'type': transaction.transaction_type,
|
|
'sell_currency': transaction.symbol_in,
|
|
'sell_amount': transaction.amount_in,
|
|
'sell_value': transaction.value_in_usd,
|
|
'buy_currency': transaction.symbol_out,
|
|
'buy_amount': transaction.amount_out,
|
|
'buy_value': transaction.value_out_usd,
|
|
'solana_signature': transaction.tx_signature,
|
|
'details': json.dumps({}),
|
|
'status': TransactionStatus.PENDING.value
|
|
}
|
|
)
|
|
|
|
async def update_holdings(self, wallet_id: str, currency: str, amount_change: float):
|
|
holding = await self.prisma.holding.find_first(
|
|
where={
|
|
'wallet_id': wallet_id,
|
|
'currency': currency
|
|
}
|
|
)
|
|
if holding:
|
|
new_amount = holding.amount + amount_change
|
|
return await self.prisma.holding.update(
|
|
where={'id': holding.id},
|
|
data={
|
|
'amount': new_amount,
|
|
'last_updated': datetime.now().isoformat()
|
|
}
|
|
)
|
|
else:
|
|
return await self.prisma.holding.create(
|
|
data={
|
|
'wallet_id': wallet_id,
|
|
'currency': currency,
|
|
'amount': amount_change,
|
|
'last_updated': datetime.now().isoformat()
|
|
}
|
|
)
|
|
|
|
async def get_wallet_holdings(self, wallet_id: str):
|
|
return await self.prisma.holding.find_many(
|
|
where={'wallet_id': wallet_id},
|
|
select={'currency': True, 'amount': True}
|
|
)
|
|
|
|
async def get_transaction_history(self, wallet_id: str, start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None, include_closed: bool = False):
|
|
filters = {'wallet_id': wallet_id}
|
|
if not include_closed:
|
|
filters['closed'] = False
|
|
if start_date:
|
|
filters['timestamp'] = {'gte': start_date}
|
|
if end_date:
|
|
filters['timestamp'] = {'lte': end_date}
|
|
|
|
return await self.prisma.transaction.find_many(
|
|
where=filters,
|
|
order={'timestamp': 'desc'}
|
|
)
|
|
|
|
async def close_transaction(self, transaction_id: int):
|
|
return await self.prisma.transaction.update(
|
|
where={'id': transaction_id},
|
|
data={'closed': True}
|
|
)
|
|
|
|
async def get_open_transactions(self, wallet_id: str, currency: str):
|
|
return await self.prisma.transaction.find_many(
|
|
where={
|
|
'wallet_id': wallet_id,
|
|
'buy_currency': currency,
|
|
'closed': False
|
|
},
|
|
order={'timestamp': 'asc'}
|
|
)
|
|
|
|
async def calculate_current_holdings(self, wallet_id: str):
|
|
transactions = await self.prisma.transaction.group_by(
|
|
by=['buy_currency'],
|
|
where={'wallet_id': wallet_id, 'closed': False},
|
|
_sum={'buy_amount': True, 'sell_amount': True}
|
|
)
|
|
return [
|
|
{
|
|
'currency': t.buy_currency,
|
|
'amount': t._sum.buy_amount - (t._sum.sell_amount or 0)
|
|
}
|
|
for t in transactions if t._sum.buy_amount > (t._sum.sell_amount or 0)
|
|
]
|
|
|
|
async def is_transaction_closed(self, wallet_id: str, transaction_id: int) -> bool:
|
|
transaction = await self.prisma.transaction.find_unique(
|
|
where={'id': transaction_id}
|
|
)
|
|
if transaction:
|
|
sold_amount = await self.prisma.transaction.aggregate(
|
|
_sum={'sell_amount': True},
|
|
where={
|
|
'wallet_id': wallet_id,
|
|
'sell_currency': transaction.buy_currency,
|
|
'timestamp': {'gt': transaction.timestamp}
|
|
}
|
|
)
|
|
return sold_amount._sum.sell_amount >= transaction.buy_amount
|
|
return False
|
|
|
|
async def close_completed_transactions(self, wallet_id: str):
|
|
transactions = await self.prisma.transaction.find_many(
|
|
where={
|
|
'wallet_id': wallet_id,
|
|
'closed': False,
|
|
'buy_currency': {'notIn': self.STABLECOINS}
|
|
}
|
|
)
|
|
for transaction in transactions:
|
|
if await self.is_transaction_closed(wallet_id, transaction.id):
|
|
await self.close_transaction(transaction.id)
|
|
|
|
async def get_profit_loss(self, wallet_id: str, currency: str,
|
|
start_date: Optional[str] = None, end_date: Optional[str] = None):
|
|
filters = {
|
|
'wallet_id': wallet_id,
|
|
'OR': [
|
|
{'sell_currency': currency},
|
|
{'buy_currency': currency}
|
|
]
|
|
}
|
|
if start_date:
|
|
filters['timestamp'] = {'gte': start_date}
|
|
if end_date:
|
|
filters['timestamp'] = {'lte': end_date}
|
|
|
|
result = await self.prisma.transaction.aggregate(
|
|
_sum={
|
|
'sell_value': True,
|
|
'buy_value': True
|
|
},
|
|
where=filters
|
|
)
|
|
return (result._sum.sell_value or 0) - (result._sum.buy_value or 0)
|
|
|
|
# User management methods
|
|
def get_or_create_user(self, email: str, google_id: str):
|
|
user = next((u for u in self.users.values() if u['email'] == email), None)
|
|
if not user:
|
|
user_id = max(u['id'] for u in self.users.values()) + 1
|
|
username = email.split('@')[0]
|
|
user = {
|
|
'id': user_id,
|
|
'username': username,
|
|
'email': email,
|
|
'google_id': google_id
|
|
}
|
|
self.users[username] = user
|
|
return user
|
|
|
|
def authenticate_user(self, username: str, password: str):
|
|
user = self.users.get(username)
|
|
if user and user['password'] == password:
|
|
return {"id": user['id'], "username": user['username'], "email": user['email']}
|
|
return None
|
|
|
|
def get_user_by_id(self, user_id: int):
|
|
for user in self.users.values():
|
|
if user['id'] == int(user_id):
|
|
return {"id": user['id'], "username": user['username'], "email": user['email']}
|
|
return None
|
|
|
|
def store_api_key(self, user_id: int, api_key: str):
|
|
print(f"Storing API key {api_key} for user {user_id}")
|
|
|
|
|
|
storage = Storage()
|