gogo2/crypto/sol/modules/storage.py
2024-11-06 11:08:59 +02:00

212 lines
6.6 KiB
Python

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json
from datetime import datetime
import prisma
from prisma import Prisma
# Initialize the Prisma client
prisma_client = Prisma()
async def init_db():
await prisma_client.connect()
async def store_transaction(wallet_id, transaction_type, sell_currency, sell_amount, sell_value, buy_currency, buy_amount, buy_value, solana_signature, details=None):
"""
Store a transaction record in the database.
"""
await prisma_client.transaction.create(
data={
'wallet_id': wallet_id,
'timestamp': datetime.now().isoformat(),
'type': transaction_type,
'sell_currency': sell_currency,
'sell_amount': sell_amount,
'sell_value': sell_value,
'buy_currency': buy_currency,
'buy_amount': buy_amount,
'buy_value': buy_value,
'solana_signature': solana_signature,
'details': json.dumps(details or {})
}
)
async def update_holdings(wallet_id, currency, amount_change):
holding = await prisma_client.holding.find_first(
where={
'wallet_id': wallet_id,
'currency': currency
}
)
if holding:
new_amount = holding.amount + amount_change
await prisma_client.holding.update(
where={'id': holding.id},
data={
'amount': new_amount,
'last_updated': datetime.now().isoformat()
}
)
else:
await prisma_client.holding.create(
data={
'wallet_id': wallet_id,
'currency': currency,
'amount': amount_change,
'last_updated': datetime.now().isoformat()
}
)
async def get_wallet_holdings(wallet_id):
return await prisma_client.holding.find_many(
where={'wallet_id': wallet_id},
select={'currency': True, 'amount': True}
)
async def get_transaction_history(wallet_id, start_date=None, end_date=None, include_closed=False):
filters = {'wallet_id': wallet_id}
if not include_closed:
filters['closed'] = False
if start_date:
filters['timestamp'] = {'gte': start_date}
if end_date:
filters['timestamp'] = {'lte': end_date}
return await prisma_client.transaction.find_many(
where=filters,
order={'timestamp': 'desc'}
)
async def close_transaction(transaction_id):
await prisma_client.transaction.update(
where={'id': transaction_id},
data={'closed': True}
)
async def get_open_transactions(wallet_id, currency):
return await prisma_client.transaction.find_many(
where={
'wallet_id': wallet_id,
'buy_currency': currency,
'closed': False
},
order={'timestamp': 'asc'}
)
async def calculate_current_holdings(wallet_id):
transactions = await prisma_client.transaction.group_by(
by=['buy_currency'],
where={'wallet_id': wallet_id, 'closed': False},
_sum={'buy_amount': True, 'sell_amount': True}
)
return [
{
'currency': t.buy_currency,
'amount': t._sum.buy_amount - (t._sum.sell_amount or 0)
}
for t in transactions if t._sum.buy_amount > (t._sum.sell_amount or 0)
]
STABLECOINS = ['USDC', 'USDT', 'SOL']
async def is_transaction_closed(wallet_id, transaction_id):
transaction = await prisma_client.transaction.find_unique(
where={'id': transaction_id}
)
if transaction:
sold_amount = await prisma_client.transaction.aggregate(
_sum={'sell_amount': True},
where={
'wallet_id': wallet_id,
'sell_currency': transaction.buy_currency,
'timestamp': {'gt': transaction.timestamp}
}
)
return sold_amount._sum.sell_amount >= transaction.buy_amount
return False
async def close_completed_transactions(wallet_id):
transactions = await prisma_client.transaction.find_many(
where={
'wallet_id': wallet_id,
'closed': False,
'buy_currency': {'notIn': STABLECOINS}
}
)
for transaction in transactions:
if await is_transaction_closed(wallet_id, transaction.id):
await close_transaction(transaction.id)
async def get_profit_loss(wallet_id, currency, start_date=None, end_date=None):
filters = {
'wallet_id': wallet_id,
'OR': [
{'sell_currency': currency},
{'buy_currency': currency}
]
}
if start_date:
filters['timestamp'] = {'gte': start_date}
if end_date:
filters['timestamp'] = {'lte': end_date}
result = await prisma_client.transaction.aggregate(
_sum={
'sell_value': True,
'buy_value': True
},
where=filters
)
return (result._sum.sell_value or 0) - (result._sum.buy_value or 0)
# # # # # # USERS
# For this example, we'll use a simple dictionary to store users
users = {
"db": {"id": 1, "username": "db", "email": "user1@example.com", "password": "db"},
"popov": {"id": 2, "username": "popov", "email": "user2@example.com", "password": "popov"}
}
def get_or_create_user(email, google_id):
user = next((u for u in users.values() if u['email'] == email), None)
if not user:
user_id = max(u['id'] for u in users.values()) + 1
username = email.split('@')[0] # Use the part before @ as username
user = {
'id': user_id,
'username': username,
'email': email,
'google_id': google_id
}
users[username] = user
return user
def authenticate_user(username, password):
"""
Authenticate a user based on username and password.
Returns user data if authentication is successful, None otherwise.
"""
user = users.get(username)
if user and user['password'] == password:
return {"id": user['id'], "username": user['username'], "email": user['email']}
return None
def get_user_by_id(user_id):
"""
Retrieve a user by their ID.
"""
for user in users.values():
if user['id'] == int(user_id):
return {"id": user['id'], "username": user['username'], "email": user['email']}
return None
def store_api_key(user_id, api_key):
"""
Store the generated API key for a user.
"""
# In a real application, you would store this in a database
# For this example, we'll just print it
print(f"Storing API key {api_key} for user {user_id}")