gogo2/crypto/sol/modules/storage.py
2024-11-14 15:39:35 +02:00

259 lines
8.6 KiB
Python

from typing import NamedTuple
from enum import Enum
from datetime import datetime
import json
from prisma import Prisma
from typing import List, Optional, Dict
import asyncio
class Transaction(NamedTuple):
wallet: str
transaction_type: str
symbol_in: str
amount_in: float
value_in_usd: float
symbol_out: str
amount_out: float
value_out_usd: float
tx_signature: str
class TransactionType(Enum):
BUY = "BUY"
SELL = "SELL"
class TransactionStatus(Enum):
PENDING = "PENDING"
SENT = "SENT"
CONFIRMED = "CONFIRMED"
class Storage:
_instance: Optional['Storage'] = None
_lock = asyncio.Lock()
_initialized = False
prisma = Prisma() # Class-level Prisma instance
STABLECOINS = ['USDC', 'USDT', 'SOL']
def __new__(cls):
if cls._instance is None:
cls._instance = super(Storage, cls).__new__(cls)
return cls._instance
async def __ainit__(self):
if self._initialized:
return
await self.prisma.connect()
self._initialized = True
@classmethod
async def get_instance(cls) -> 'Storage':
if not cls._instance:
async with cls._lock:
if not cls._instance:
cls._instance = cls()
await cls._instance.__ainit__()
return cls._instance
@classmethod
def get_prisma(cls):
return cls.prisma
async def disconnect(self):
if self._initialized:
await self.prisma.disconnect()
self._initialized = False
def __init__(self):
self.prisma = Prisma()
self.users = {
"db": {"id": 1, "username": "db", "email": "user1@example.com", "password": "db"},
"popov": {"id": 2, "username": "popov", "email": "user2@example.com", "password": "popov"}
}
def is_connected(self):
return self.prisma.is_connected()
async def connect(self):
await self.prisma.connect()
async def disconnect(self):
await self.prisma.disconnect()
async def store_transaction(self, transaction: Transaction):
return await self.prisma.transaction.create(
data={
'wallet_id': transaction.wallet,
'timestamp': datetime.now().isoformat(),
'type': transaction.transaction_type,
'sell_currency': transaction.symbol_in,
'sell_amount': transaction.amount_in,
'sell_value': transaction.value_in_usd,
'buy_currency': transaction.symbol_out,
'buy_amount': transaction.amount_out,
'buy_value': transaction.value_out_usd,
'solana_signature': transaction.tx_signature,
'details': json.dumps({}),
'status': TransactionStatus.PENDING.value
}
)
async def update_holdings(self, wallet_id: str, currency: str, amount_change: float):
holding = await self.prisma.holding.find_first(
where={
'wallet_id': wallet_id,
'currency': currency
}
)
if holding:
new_amount = holding.amount + amount_change
return await self.prisma.holding.update(
where={'id': holding.id},
data={
'amount': new_amount,
'last_updated': datetime.now().isoformat()
}
)
else:
return await self.prisma.holding.create(
data={
'wallet_id': wallet_id,
'currency': currency,
'amount': amount_change,
'last_updated': datetime.now().isoformat()
}
)
async def get_wallet_holdings(self, wallet_id: str):
return await self.prisma.holding.find_many(
where={'wallet_id': wallet_id},
select={'currency': True, 'amount': True}
)
async def get_transaction_history(self, wallet_id: str, start_date: Optional[str] = None,
end_date: Optional[str] = None, include_closed: bool = False):
filters = {'wallet_id': wallet_id}
if not include_closed:
filters['closed'] = False
if start_date:
filters['timestamp'] = {'gte': start_date}
if end_date:
filters['timestamp'] = {'lte': end_date}
return await self.prisma.transaction.find_many(
where=filters,
order={'timestamp': 'desc'}
)
async def close_transaction(self, transaction_id: int):
return await self.prisma.transaction.update(
where={'id': transaction_id},
data={'closed': True}
)
async def get_open_transactions(self, wallet_id: str, currency: str):
return await self.prisma.transaction.find_many(
where={
'wallet_id': wallet_id,
'buy_currency': currency,
'closed': False
},
order={'timestamp': 'asc'}
)
async def calculate_current_holdings(self, wallet_id: str):
transactions = await self.prisma.transaction.group_by(
by=['buy_currency'],
where={'wallet_id': wallet_id, 'closed': False},
_sum={'buy_amount': True, 'sell_amount': True}
)
return [
{
'currency': t.buy_currency,
'amount': t._sum.buy_amount - (t._sum.sell_amount or 0)
}
for t in transactions if t._sum.buy_amount > (t._sum.sell_amount or 0)
]
async def is_transaction_closed(self, wallet_id: str, transaction_id: int) -> bool:
transaction = await self.prisma.transaction.find_unique(
where={'id': transaction_id}
)
if transaction:
sold_amount = await self.prisma.transaction.aggregate(
_sum={'sell_amount': True},
where={
'wallet_id': wallet_id,
'sell_currency': transaction.buy_currency,
'timestamp': {'gt': transaction.timestamp}
}
)
return sold_amount._sum.sell_amount >= transaction.buy_amount
return False
async def close_completed_transactions(self, wallet_id: str):
transactions = await self.prisma.transaction.find_many(
where={
'wallet_id': wallet_id,
'closed': False,
'buy_currency': {'notIn': self.STABLECOINS}
}
)
for transaction in transactions:
if await self.is_transaction_closed(wallet_id, transaction.id):
await self.close_transaction(transaction.id)
async def get_profit_loss(self, wallet_id: str, currency: str,
start_date: Optional[str] = None, end_date: Optional[str] = None):
filters = {
'wallet_id': wallet_id,
'OR': [
{'sell_currency': currency},
{'buy_currency': currency}
]
}
if start_date:
filters['timestamp'] = {'gte': start_date}
if end_date:
filters['timestamp'] = {'lte': end_date}
result = await self.prisma.transaction.aggregate(
_sum={
'sell_value': True,
'buy_value': True
},
where=filters
)
return (result._sum.sell_value or 0) - (result._sum.buy_value or 0)
# User management methods
def get_or_create_user(self, email: str, google_id: str):
user = next((u for u in self.users.values() if u['email'] == email), None)
if not user:
user_id = max(u['id'] for u in self.users.values()) + 1
username = email.split('@')[0]
user = {
'id': user_id,
'username': username,
'email': email,
'google_id': google_id
}
self.users[username] = user
return user
def authenticate_user(self, username: str, password: str):
user = self.users.get(username)
if user and user['password'] == password:
return {"id": user['id'], "username": user['username'], "email": user['email']}
return None
def get_user_by_id(self, user_id: int):
for user in self.users.values():
if user['id'] == int(user_id):
return {"id": user['id'], "username": user['username'], "email": user['email']}
return None
def store_api_key(self, user_id: int, api_key: str):
print(f"Storing API key {api_key} for user {user_id}")
storage = Storage()