301 lines
9.1 KiB
Python
301 lines
9.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test GPU Training - Check if our models actually train and use GPU
|
|
"""
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import numpy as np
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
import sys
|
|
|
|
# Add project root to path
|
|
project_root = Path(__file__).parent
|
|
sys.path.insert(0, str(project_root))
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def test_gpu_availability():
|
|
"""Test if GPU is available and working"""
|
|
logger.info("=== GPU AVAILABILITY TEST ===")
|
|
|
|
print(f"PyTorch version: {torch.__version__}")
|
|
print(f"CUDA available: {torch.cuda.is_available()}")
|
|
|
|
if torch.cuda.is_available():
|
|
print(f"CUDA version: {torch.version.cuda}")
|
|
print(f"GPU count: {torch.cuda.device_count()}")
|
|
for i in range(torch.cuda.device_count()):
|
|
print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
|
|
print(f" Memory: {torch.cuda.get_device_properties(i).total_memory / 1024**3:.1f} GB")
|
|
|
|
# Test GPU operations
|
|
try:
|
|
device = torch.device('cuda:0')
|
|
x = torch.randn(100, 100, device=device)
|
|
y = torch.randn(100, 100, device=device)
|
|
z = torch.mm(x, y)
|
|
print(f"✅ GPU operations working: {z.device}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ GPU operations failed: {e}")
|
|
return False
|
|
else:
|
|
print("❌ No CUDA available")
|
|
return False
|
|
|
|
def test_simple_training():
|
|
"""Test if a simple neural network actually trains"""
|
|
logger.info("=== SIMPLE TRAINING TEST ===")
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
print(f"Using device: {device}")
|
|
|
|
# Create a simple model
|
|
class SimpleNet(nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.layers = nn.Sequential(
|
|
nn.Linear(10, 64),
|
|
nn.ReLU(),
|
|
nn.Linear(64, 32),
|
|
nn.ReLU(),
|
|
nn.Linear(32, 3)
|
|
)
|
|
|
|
def forward(self, x):
|
|
return self.layers(x)
|
|
|
|
model = SimpleNet().to(device)
|
|
optimizer = optim.Adam(model.parameters(), lr=0.001)
|
|
criterion = nn.CrossEntropyLoss()
|
|
|
|
# Generate some dummy data
|
|
X = torch.randn(1000, 10, device=device)
|
|
y = torch.randint(0, 3, (1000,), device=device)
|
|
|
|
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
|
|
print(f"Data shape: {X.shape}, Labels shape: {y.shape}")
|
|
|
|
# Training loop
|
|
initial_loss = None
|
|
losses = []
|
|
|
|
print("Training for 100 steps...")
|
|
start_time = time.time()
|
|
|
|
for step in range(100):
|
|
# Forward pass
|
|
outputs = model(X)
|
|
loss = criterion(outputs, y)
|
|
|
|
# Backward pass
|
|
optimizer.zero_grad()
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
loss_val = loss.item()
|
|
losses.append(loss_val)
|
|
|
|
if step == 0:
|
|
initial_loss = loss_val
|
|
|
|
if step % 20 == 0:
|
|
print(f"Step {step}: Loss = {loss_val:.4f}")
|
|
|
|
end_time = time.time()
|
|
final_loss = losses[-1]
|
|
|
|
print(f"Training completed in {end_time - start_time:.2f} seconds")
|
|
print(f"Initial loss: {initial_loss:.4f}")
|
|
print(f"Final loss: {final_loss:.4f}")
|
|
print(f"Loss reduction: {initial_loss - final_loss:.4f}")
|
|
|
|
# Check if training actually happened
|
|
if final_loss < initial_loss * 0.9: # At least 10% reduction
|
|
print("✅ Training is working - loss decreased significantly")
|
|
return True
|
|
else:
|
|
print("❌ Training may not be working - loss didn't decrease much")
|
|
return False
|
|
|
|
def test_our_models():
|
|
"""Test if our actual models can train"""
|
|
logger.info("=== OUR MODELS TEST ===")
|
|
|
|
try:
|
|
# Test DQN Agent
|
|
from NN.models.dqn_agent import DQNAgent
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
print(f"Testing DQN Agent on {device}")
|
|
|
|
# Create agent
|
|
state_shape = (100,) # Simple state
|
|
agent = DQNAgent(
|
|
state_shape=state_shape,
|
|
n_actions=3,
|
|
learning_rate=0.001,
|
|
device=device
|
|
)
|
|
|
|
print(f"✅ DQN Agent created successfully")
|
|
print(f" Device: {agent.device}")
|
|
print(f" Policy net device: {next(agent.policy_net.parameters()).device}")
|
|
|
|
# Test training step
|
|
state = np.random.randn(100).astype(np.float32)
|
|
action = 1
|
|
reward = 0.5
|
|
next_state = np.random.randn(100).astype(np.float32)
|
|
done = False
|
|
|
|
# Add experience and train
|
|
agent.remember(state, action, reward, next_state, done)
|
|
|
|
# Add more experiences
|
|
for _ in range(200): # Need enough for batch
|
|
s = np.random.randn(100).astype(np.float32)
|
|
a = np.random.randint(0, 3)
|
|
r = np.random.randn() * 0.1
|
|
ns = np.random.randn(100).astype(np.float32)
|
|
d = np.random.random() < 0.1
|
|
agent.remember(s, a, r, ns, d)
|
|
|
|
# Test training
|
|
print("Testing training step...")
|
|
initial_loss = None
|
|
for i in range(10):
|
|
loss = agent.replay()
|
|
if loss > 0:
|
|
if initial_loss is None:
|
|
initial_loss = loss
|
|
print(f" Step {i}: Loss = {loss:.4f}")
|
|
|
|
if initial_loss is not None:
|
|
print("✅ DQN training is working")
|
|
else:
|
|
print("❌ DQN training returned no loss")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error testing our models: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def test_cnn_model():
|
|
"""Test CNN model training"""
|
|
logger.info("=== CNN MODEL TEST ===")
|
|
|
|
try:
|
|
from NN.models.enhanced_cnn import EnhancedCNN
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
print(f"Testing Enhanced CNN on {device}")
|
|
|
|
# Create model
|
|
state_dim = (3, 20, 26) # 3 timeframes, 20 window, 26 features
|
|
n_actions = 3
|
|
|
|
model = EnhancedCNN(state_dim, n_actions).to(device)
|
|
optimizer = optim.Adam(model.parameters(), lr=0.001)
|
|
criterion = nn.CrossEntropyLoss()
|
|
|
|
print(f"✅ Enhanced CNN created successfully")
|
|
print(f" Parameters: {sum(p.numel() for p in model.parameters()):,}")
|
|
|
|
# Test forward pass
|
|
batch_size = 32
|
|
x = torch.randn(batch_size, 3, 20, 26, device=device)
|
|
|
|
print("Testing forward pass...")
|
|
outputs = model(x)
|
|
|
|
if isinstance(outputs, tuple):
|
|
action_probs, extrema_pred, price_pred, features, advanced_pred = outputs
|
|
print(f"✅ Forward pass successful")
|
|
print(f" Action probs shape: {action_probs.shape}")
|
|
print(f" Features shape: {features.shape}")
|
|
else:
|
|
print(f"❌ Unexpected output format: {type(outputs)}")
|
|
return False
|
|
|
|
# Test training step
|
|
y = torch.randint(0, 3, (batch_size,), device=device)
|
|
|
|
print("Testing training step...")
|
|
loss = criterion(action_probs, y)
|
|
|
|
optimizer.zero_grad()
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
print(f"✅ CNN training step successful, loss: {loss.item():.4f}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error testing CNN model: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return False
|
|
|
|
def main():
|
|
"""Run all tests"""
|
|
print("=" * 60)
|
|
print("TESTING GPU TRAINING FUNCTIONALITY")
|
|
print("=" * 60)
|
|
|
|
results = {}
|
|
|
|
# Test 1: GPU availability
|
|
results['gpu'] = test_gpu_availability()
|
|
print()
|
|
|
|
# Test 2: Simple training
|
|
results['simple_training'] = test_simple_training()
|
|
print()
|
|
|
|
# Test 3: Our DQN models
|
|
results['dqn_models'] = test_our_models()
|
|
print()
|
|
|
|
# Test 4: CNN models
|
|
results['cnn_models'] = test_cnn_model()
|
|
print()
|
|
|
|
# Summary
|
|
print("=" * 60)
|
|
print("TEST RESULTS SUMMARY")
|
|
print("=" * 60)
|
|
|
|
for test_name, passed in results.items():
|
|
status = "✅ PASS" if passed else "❌ FAIL"
|
|
print(f"{test_name.upper()}: {status}")
|
|
|
|
all_passed = all(results.values())
|
|
|
|
if all_passed:
|
|
print("\n🎉 ALL TESTS PASSED - Your training should work with GPU!")
|
|
else:
|
|
print("\n⚠️ SOME TESTS FAILED - Check the issues above")
|
|
|
|
if not results['gpu']:
|
|
print(" → GPU not available or not working")
|
|
if not results['simple_training']:
|
|
print(" → Basic training loop not working")
|
|
if not results['dqn_models']:
|
|
print(" → DQN models have issues")
|
|
if not results['cnn_models']:
|
|
print(" → CNN models have issues")
|
|
|
|
return 0 if all_passed else 1
|
|
|
|
if __name__ == "__main__":
|
|
exit_code = main()
|
|
sys.exit(exit_code) |