344 lines
12 KiB
Python
344 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Trade Audit Tool
|
|
|
|
This tool analyzes trade data to identify potential issues with:
|
|
- Duplicate entry prices
|
|
- Rapid consecutive trades
|
|
- P&L calculation accuracy
|
|
- Position tracking problems
|
|
|
|
Usage:
|
|
python debug/trade_audit.py [--trades-file path/to/trades.json]
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import pandas as pd
|
|
import numpy as np
|
|
from datetime import datetime, timedelta
|
|
import matplotlib.pyplot as plt
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent.parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
def parse_trade_time(time_str):
|
|
"""Parse trade time string to datetime object"""
|
|
try:
|
|
# Try HH:MM:SS format
|
|
return datetime.strptime(time_str, "%H:%M:%S")
|
|
except ValueError:
|
|
try:
|
|
# Try full datetime format
|
|
return datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
|
|
except ValueError:
|
|
# Return as is if parsing fails
|
|
return time_str
|
|
|
|
def load_trades_from_file(file_path):
|
|
"""Load trades from JSON file"""
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
print(f"Error: File {file_path} not found")
|
|
return []
|
|
except json.JSONDecodeError:
|
|
print(f"Error: File {file_path} is not valid JSON")
|
|
return []
|
|
|
|
def load_trades_from_dashboard_cache():
|
|
"""Load trades from dashboard cache file if available"""
|
|
cache_paths = [
|
|
"cache/dashboard_trades.json",
|
|
"cache/closed_trades.json",
|
|
"data/trades_history.json"
|
|
]
|
|
|
|
for path in cache_paths:
|
|
if os.path.exists(path):
|
|
print(f"Loading trades from cache: {path}")
|
|
return load_trades_from_file(path)
|
|
|
|
print("No trade cache files found")
|
|
return []
|
|
|
|
def parse_trade_data(trades_data):
|
|
"""Parse trade data into a pandas DataFrame for analysis"""
|
|
parsed_trades = []
|
|
|
|
for trade in trades_data:
|
|
# Handle different trade data formats
|
|
parsed_trade = {}
|
|
|
|
# Time field might be named entry_time or time
|
|
if 'entry_time' in trade:
|
|
parsed_trade['time'] = parse_trade_time(trade['entry_time'])
|
|
elif 'time' in trade:
|
|
parsed_trade['time'] = parse_trade_time(trade['time'])
|
|
else:
|
|
parsed_trade['time'] = None
|
|
|
|
# Side might be named side or action
|
|
parsed_trade['side'] = trade.get('side', trade.get('action', 'UNKNOWN'))
|
|
|
|
# Size might be named size or quantity
|
|
parsed_trade['size'] = float(trade.get('size', trade.get('quantity', 0)))
|
|
|
|
# Entry and exit prices
|
|
parsed_trade['entry_price'] = float(trade.get('entry_price', trade.get('entry', 0)))
|
|
parsed_trade['exit_price'] = float(trade.get('exit_price', trade.get('exit', 0)))
|
|
|
|
# Hold time in seconds
|
|
parsed_trade['hold_time'] = float(trade.get('hold_time_seconds', trade.get('hold', 0)))
|
|
|
|
# P&L and fees
|
|
parsed_trade['pnl'] = float(trade.get('pnl', 0))
|
|
parsed_trade['fees'] = float(trade.get('fees', 0))
|
|
|
|
# Calculate expected P&L for verification
|
|
if parsed_trade['side'] == 'LONG' or parsed_trade['side'] == 'BUY':
|
|
expected_pnl = (parsed_trade['exit_price'] - parsed_trade['entry_price']) * parsed_trade['size']
|
|
else: # SHORT or SELL
|
|
expected_pnl = (parsed_trade['entry_price'] - parsed_trade['exit_price']) * parsed_trade['size']
|
|
|
|
parsed_trade['expected_pnl'] = expected_pnl
|
|
parsed_trade['pnl_difference'] = parsed_trade['pnl'] - expected_pnl
|
|
|
|
parsed_trades.append(parsed_trade)
|
|
|
|
# Convert to DataFrame
|
|
if parsed_trades:
|
|
df = pd.DataFrame(parsed_trades)
|
|
return df
|
|
else:
|
|
return pd.DataFrame()
|
|
|
|
def analyze_trades(df):
|
|
"""Analyze trades for potential issues"""
|
|
if df.empty:
|
|
print("No trades to analyze")
|
|
return
|
|
|
|
print(f"\n{'='*50}")
|
|
print("TRADE AUDIT RESULTS")
|
|
print(f"{'='*50}")
|
|
print(f"Total trades analyzed: {len(df)}")
|
|
|
|
# Check for duplicate entry prices
|
|
entry_price_counts = df['entry_price'].value_counts()
|
|
duplicate_entries = entry_price_counts[entry_price_counts > 1]
|
|
|
|
print(f"\n{'='*20} DUPLICATE ENTRY PRICES {'='*20}")
|
|
if not duplicate_entries.empty:
|
|
print(f"Found {len(duplicate_entries)} prices with multiple entries:")
|
|
for price, count in duplicate_entries.items():
|
|
print(f" ${price:.2f}: {count} trades")
|
|
|
|
# Analyze the duplicate entry trades in more detail
|
|
for price in duplicate_entries.index:
|
|
duplicate_df = df[df['entry_price'] == price].copy()
|
|
duplicate_df['time_diff'] = duplicate_df['time'].diff().dt.total_seconds()
|
|
|
|
print(f"\nDetailed analysis for entry price ${price:.2f}:")
|
|
print(f" Time gaps between consecutive trades:")
|
|
for i, (_, row) in enumerate(duplicate_df.iterrows()):
|
|
if i > 0: # Skip first row as it has no previous trade
|
|
time_diff = row['time_diff']
|
|
if pd.notna(time_diff):
|
|
print(f" {row['time'].strftime('%H:%M:%S')}: {time_diff:.0f} seconds after previous trade")
|
|
else:
|
|
print("No duplicate entry prices found")
|
|
|
|
# Check for rapid consecutive trades
|
|
df = df.sort_values('time')
|
|
df['time_since_last'] = df['time'].diff().dt.total_seconds()
|
|
|
|
rapid_trades = df[df['time_since_last'] < 30].copy()
|
|
|
|
print(f"\n{'='*20} RAPID CONSECUTIVE TRADES {'='*20}")
|
|
if not rapid_trades.empty:
|
|
print(f"Found {len(rapid_trades)} trades executed within 30 seconds of previous trade:")
|
|
for _, row in rapid_trades.iterrows():
|
|
if pd.notna(row['time_since_last']):
|
|
print(f" {row['time'].strftime('%H:%M:%S')} - {row['side']} ${row['size']:.2f} @ ${row['entry_price']:.2f} - {row['time_since_last']:.0f}s after previous")
|
|
else:
|
|
print("No rapid consecutive trades found")
|
|
|
|
# Check for P&L calculation accuracy
|
|
pnl_diff = df[abs(df['pnl_difference']) > 0.01].copy()
|
|
|
|
print(f"\n{'='*20} P&L CALCULATION ISSUES {'='*20}")
|
|
if not pnl_diff.empty:
|
|
print(f"Found {len(pnl_diff)} trades with P&L calculation discrepancies:")
|
|
for _, row in pnl_diff.iterrows():
|
|
print(f" {row['time'].strftime('%H:%M:%S')} - {row['side']} - Reported: ${row['pnl']:.2f}, Expected: ${row['expected_pnl']:.2f}, Diff: ${row['pnl_difference']:.2f}")
|
|
else:
|
|
print("No P&L calculation issues found")
|
|
|
|
# Check for side distribution
|
|
side_counts = df['side'].value_counts()
|
|
|
|
print(f"\n{'='*20} TRADE SIDE DISTRIBUTION {'='*20}")
|
|
for side, count in side_counts.items():
|
|
print(f" {side}: {count} trades ({count/len(df)*100:.1f}%)")
|
|
|
|
# Check for hold time distribution
|
|
print(f"\n{'='*20} HOLD TIME DISTRIBUTION {'='*20}")
|
|
print(f" Min hold time: {df['hold_time'].min():.0f} seconds")
|
|
print(f" Max hold time: {df['hold_time'].max():.0f} seconds")
|
|
print(f" Avg hold time: {df['hold_time'].mean():.0f} seconds")
|
|
print(f" Median hold time: {df['hold_time'].median():.0f} seconds")
|
|
|
|
# Hold time buckets
|
|
hold_buckets = [0, 30, 60, 120, 300, 600, 1800, 3600, float('inf')]
|
|
hold_labels = ['0-30s', '30-60s', '1-2m', '2-5m', '5-10m', '10-30m', '30-60m', '60m+']
|
|
|
|
df['hold_bucket'] = pd.cut(df['hold_time'], bins=hold_buckets, labels=hold_labels)
|
|
hold_dist = df['hold_bucket'].value_counts().sort_index()
|
|
|
|
for bucket, count in hold_dist.items():
|
|
print(f" {bucket}: {count} trades ({count/len(df)*100:.1f}%)")
|
|
|
|
# Generate summary statistics
|
|
print(f"\n{'='*20} TRADE PERFORMANCE SUMMARY {'='*20}")
|
|
winning_trades = df[df['pnl'] > 0]
|
|
losing_trades = df[df['pnl'] < 0]
|
|
|
|
print(f" Win rate: {len(winning_trades)/len(df)*100:.1f}% ({len(winning_trades)}W/{len(losing_trades)}L)")
|
|
print(f" Avg win: ${winning_trades['pnl'].mean():.2f}")
|
|
print(f" Avg loss: ${abs(losing_trades['pnl'].mean()):.2f}")
|
|
print(f" Total P&L: ${df['pnl'].sum():.2f}")
|
|
print(f" Total fees: ${df['fees'].sum():.2f}")
|
|
print(f" Net P&L: ${(df['pnl'].sum() - df['fees'].sum()):.2f}")
|
|
|
|
# Plot entry price distribution
|
|
plt.figure(figsize=(10, 6))
|
|
plt.hist(df['entry_price'], bins=20, alpha=0.7)
|
|
plt.title('Entry Price Distribution')
|
|
plt.xlabel('Entry Price ($)')
|
|
plt.ylabel('Number of Trades')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.savefig('debug/entry_price_distribution.png')
|
|
|
|
# Plot P&L distribution
|
|
plt.figure(figsize=(10, 6))
|
|
plt.hist(df['pnl'], bins=20, alpha=0.7)
|
|
plt.title('P&L Distribution')
|
|
plt.xlabel('P&L ($)')
|
|
plt.ylabel('Number of Trades')
|
|
plt.grid(True, alpha=0.3)
|
|
plt.savefig('debug/pnl_distribution.png')
|
|
|
|
print(f"\n{'='*20} AUDIT COMPLETE {'='*20}")
|
|
print("Plots saved to debug/entry_price_distribution.png and debug/pnl_distribution.png")
|
|
|
|
def analyze_manual_trades(trades_data):
|
|
"""Analyze manually provided trade data"""
|
|
# Parse the trade data into a structured format
|
|
parsed_trades = []
|
|
|
|
for line in trades_data.strip().split('\n'):
|
|
if not line or line.startswith('from last session') or line.startswith('Recent Closed Trades') or line.startswith('Trading Performance'):
|
|
continue
|
|
|
|
if line.startswith('Win Rate:'):
|
|
# This is the summary line, skip it
|
|
continue
|
|
|
|
try:
|
|
# Parse trade line format: Time Side Size Entry Exit Hold P&L Fees
|
|
parts = line.split('$')
|
|
|
|
time_side = parts[0].strip().split()
|
|
time = time_side[0]
|
|
side = time_side[1]
|
|
|
|
size = float(parts[1].split()[0])
|
|
entry = float(parts[2].split()[0])
|
|
exit = float(parts[3].split()[0])
|
|
|
|
# The hold time and P&L are in the last parts
|
|
remaining = parts[3].split()
|
|
hold = int(remaining[1])
|
|
pnl = float(parts[4].split()[0])
|
|
|
|
# Fees might be in a different format
|
|
if len(parts) > 5:
|
|
fees = float(parts[5].strip())
|
|
else:
|
|
fees = 0.0
|
|
|
|
parsed_trade = {
|
|
'time': parse_trade_time(time),
|
|
'side': side,
|
|
'size': size,
|
|
'entry_price': entry,
|
|
'exit_price': exit,
|
|
'hold_time': hold,
|
|
'pnl': pnl,
|
|
'fees': fees
|
|
}
|
|
|
|
# Calculate expected P&L
|
|
if side == 'LONG' or side == 'BUY':
|
|
expected_pnl = (exit - entry) * size
|
|
else: # SHORT or SELL
|
|
expected_pnl = (entry - exit) * size
|
|
|
|
parsed_trade['expected_pnl'] = expected_pnl
|
|
parsed_trade['pnl_difference'] = pnl - expected_pnl
|
|
|
|
parsed_trades.append(parsed_trade)
|
|
|
|
except Exception as e:
|
|
print(f"Error parsing trade line: {line}")
|
|
print(f"Error details: {e}")
|
|
|
|
# Convert to DataFrame
|
|
if parsed_trades:
|
|
df = pd.DataFrame(parsed_trades)
|
|
return df
|
|
else:
|
|
return pd.DataFrame()
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Trade Audit Tool')
|
|
parser.add_argument('--trades-file', type=str, help='Path to trades JSON file')
|
|
parser.add_argument('--manual-trades', type=str, help='Path to text file with manually entered trades')
|
|
args = parser.parse_args()
|
|
|
|
# Create debug directory if it doesn't exist
|
|
os.makedirs('debug', exist_ok=True)
|
|
|
|
if args.trades_file:
|
|
trades_data = load_trades_from_file(args.trades_file)
|
|
df = parse_trade_data(trades_data)
|
|
elif args.manual_trades:
|
|
try:
|
|
with open(args.manual_trades, 'r') as f:
|
|
manual_trades = f.read()
|
|
df = analyze_manual_trades(manual_trades)
|
|
except Exception as e:
|
|
print(f"Error reading manual trades file: {e}")
|
|
df = pd.DataFrame()
|
|
else:
|
|
# Try to load from dashboard cache
|
|
trades_data = load_trades_from_dashboard_cache()
|
|
if trades_data:
|
|
df = parse_trade_data(trades_data)
|
|
else:
|
|
print("No trade data provided. Use --trades-file or --manual-trades")
|
|
return
|
|
|
|
if not df.empty:
|
|
analyze_trades(df)
|
|
else:
|
|
print("No valid trade data to analyze")
|
|
|
|
if __name__ == "__main__":
|
|
main() |