Compare commits
3 Commits
1d09b3778e
...
e7ea17b626
Author | SHA1 | Date | |
---|---|---|---|
e7ea17b626 | |||
8685319989 | |||
6a4a73ff0b |
@ -443,11 +443,33 @@ class EnhancedCNNModel(nn.Module):
|
|||||||
# Forward pass
|
# Forward pass
|
||||||
outputs = self.forward(x)
|
outputs = self.forward(x)
|
||||||
|
|
||||||
# Extract results
|
# Extract results with proper shape handling
|
||||||
probs = outputs['probabilities'].cpu().numpy()[0]
|
probs = outputs['probabilities'].cpu().numpy()[0]
|
||||||
confidence = outputs['confidence'].cpu().numpy()[0]
|
confidence_tensor = outputs['confidence'].cpu().numpy()
|
||||||
regime = outputs['regime'].cpu().numpy()[0]
|
regime = outputs['regime'].cpu().numpy()[0]
|
||||||
volatility = outputs['volatility'].cpu().numpy()[0]
|
volatility = outputs['volatility'].cpu().numpy()
|
||||||
|
|
||||||
|
# Handle confidence shape properly
|
||||||
|
if isinstance(confidence_tensor, np.ndarray):
|
||||||
|
if confidence_tensor.ndim == 0:
|
||||||
|
confidence = float(confidence_tensor.item())
|
||||||
|
elif confidence_tensor.size == 1:
|
||||||
|
confidence = float(confidence_tensor.flatten()[0])
|
||||||
|
else:
|
||||||
|
confidence = float(confidence_tensor[0] if len(confidence_tensor) > 0 else 0.7)
|
||||||
|
else:
|
||||||
|
confidence = float(confidence_tensor)
|
||||||
|
|
||||||
|
# Handle volatility shape properly
|
||||||
|
if isinstance(volatility, np.ndarray):
|
||||||
|
if volatility.ndim == 0:
|
||||||
|
volatility = float(volatility.item())
|
||||||
|
elif volatility.size == 1:
|
||||||
|
volatility = float(volatility.flatten()[0])
|
||||||
|
else:
|
||||||
|
volatility = float(volatility[0] if len(volatility) > 0 else 0.0)
|
||||||
|
else:
|
||||||
|
volatility = float(volatility)
|
||||||
|
|
||||||
# Determine action (0=BUY, 1=SELL for 2-action system)
|
# Determine action (0=BUY, 1=SELL for 2-action system)
|
||||||
action = int(np.argmax(probs))
|
action = int(np.argmax(probs))
|
||||||
|
@ -396,11 +396,33 @@ class EnhancedCNNModel(nn.Module):
|
|||||||
# Forward pass
|
# Forward pass
|
||||||
outputs = self.forward(x)
|
outputs = self.forward(x)
|
||||||
|
|
||||||
# Extract results
|
# Extract results with proper shape handling
|
||||||
probs = outputs['probabilities'].cpu().numpy()[0]
|
probs = outputs['probabilities'].cpu().numpy()[0]
|
||||||
confidence = outputs['confidence'].cpu().numpy()[0]
|
confidence_tensor = outputs['confidence'].cpu().numpy()
|
||||||
regime = outputs['regime'].cpu().numpy()[0]
|
regime = outputs['regime'].cpu().numpy()[0]
|
||||||
volatility = outputs['volatility'].cpu().numpy()[0]
|
volatility_tensor = outputs['volatility'].cpu().numpy()
|
||||||
|
|
||||||
|
# Handle confidence shape properly to avoid scalar conversion errors
|
||||||
|
if isinstance(confidence_tensor, np.ndarray):
|
||||||
|
if confidence_tensor.ndim == 0:
|
||||||
|
confidence = float(confidence_tensor.item())
|
||||||
|
elif confidence_tensor.size == 1:
|
||||||
|
confidence = float(confidence_tensor.flatten()[0])
|
||||||
|
else:
|
||||||
|
confidence = float(confidence_tensor[0] if len(confidence_tensor) > 0 else 0.7)
|
||||||
|
else:
|
||||||
|
confidence = float(confidence_tensor)
|
||||||
|
|
||||||
|
# Handle volatility shape properly
|
||||||
|
if isinstance(volatility_tensor, np.ndarray):
|
||||||
|
if volatility_tensor.ndim == 0:
|
||||||
|
volatility = float(volatility_tensor.item())
|
||||||
|
elif volatility_tensor.size == 1:
|
||||||
|
volatility = float(volatility_tensor.flatten()[0])
|
||||||
|
else:
|
||||||
|
volatility = float(volatility_tensor[0] if len(volatility_tensor) > 0 else 0.0)
|
||||||
|
else:
|
||||||
|
volatility = float(volatility_tensor)
|
||||||
|
|
||||||
# Determine action (0=BUY, 1=SELL for 2-action system)
|
# Determine action (0=BUY, 1=SELL for 2-action system)
|
||||||
action = int(np.argmax(probs))
|
action = int(np.argmax(probs))
|
||||||
@ -409,11 +431,11 @@ class EnhancedCNNModel(nn.Module):
|
|||||||
return {
|
return {
|
||||||
'action': action,
|
'action': action,
|
||||||
'action_name': 'BUY' if action == 0 else 'SELL',
|
'action_name': 'BUY' if action == 0 else 'SELL',
|
||||||
'confidence': float(confidence),
|
'confidence': confidence, # Already converted to float above
|
||||||
'action_confidence': action_confidence,
|
'action_confidence': action_confidence,
|
||||||
'probabilities': probs.tolist(),
|
'probabilities': probs.tolist(),
|
||||||
'regime_probabilities': regime.tolist(),
|
'regime_probabilities': regime.tolist(),
|
||||||
'volatility_prediction': float(volatility),
|
'volatility_prediction': volatility, # Already converted to float above
|
||||||
'raw_logits': outputs['logits'].cpu().numpy()[0].tolist()
|
'raw_logits': outputs['logits'].cpu().numpy()[0].tolist()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,7 +501,16 @@ class EnhancedCNNWithOrderBook(nn.Module):
|
|||||||
# Get probabilities
|
# Get probabilities
|
||||||
q_values = outputs['q_values']
|
q_values = outputs['q_values']
|
||||||
probs = F.softmax(q_values, dim=1)
|
probs = F.softmax(q_values, dim=1)
|
||||||
confidence = outputs['confidence'].item()
|
|
||||||
|
# Handle confidence shape properly to avoid scalar conversion errors
|
||||||
|
confidence_tensor = outputs['confidence']
|
||||||
|
if isinstance(confidence_tensor, torch.Tensor):
|
||||||
|
if confidence_tensor.numel() == 1:
|
||||||
|
confidence = confidence_tensor.item()
|
||||||
|
else:
|
||||||
|
confidence = confidence_tensor.flatten()[0].item()
|
||||||
|
else:
|
||||||
|
confidence = float(confidence_tensor)
|
||||||
|
|
||||||
# Action selection with confidence thresholding
|
# Action selection with confidence thresholding
|
||||||
if confidence >= self.confidence_threshold:
|
if confidence >= self.confidence_threshold:
|
||||||
|
@ -142,6 +142,16 @@ class DataProvider:
|
|||||||
binance_symbol = symbol.replace('/', '').upper()
|
binance_symbol = symbol.replace('/', '').upper()
|
||||||
self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size)
|
self.tick_buffers[binance_symbol] = deque(maxlen=self.buffer_size)
|
||||||
|
|
||||||
|
# BOM (Book of Market) data caching - 1s resolution for last 5 minutes
|
||||||
|
self.bom_cache_duration = 300 # 5 minutes in seconds
|
||||||
|
self.bom_feature_count = 120 # Number of BOM features per timestamp
|
||||||
|
self.bom_data_cache: Dict[str, deque] = {} # {symbol: deque of (timestamp, bom_features)}
|
||||||
|
|
||||||
|
# Initialize BOM cache for each symbol
|
||||||
|
for symbol in self.symbols:
|
||||||
|
# Store 300 seconds worth of 1s BOM data
|
||||||
|
self.bom_data_cache[symbol] = deque(maxlen=self.bom_cache_duration)
|
||||||
|
|
||||||
# Initialize tick aggregator for raw tick processing
|
# Initialize tick aggregator for raw tick processing
|
||||||
binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols]
|
binance_symbols = [symbol.replace('/', '').upper() for symbol in self.symbols]
|
||||||
self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols)
|
self.tick_aggregator = RealTimeTickAggregator(symbols=binance_symbols)
|
||||||
@ -1497,8 +1507,15 @@ class DataProvider:
|
|||||||
timeframe_secs = self.timeframe_seconds.get(timeframe, 3600)
|
timeframe_secs = self.timeframe_seconds.get(timeframe, 3600)
|
||||||
current_time = tick['timestamp']
|
current_time = tick['timestamp']
|
||||||
|
|
||||||
# Calculate candle start time
|
# Calculate candle start time using proper datetime truncation
|
||||||
candle_start = current_time.floor(f'{timeframe_secs}s')
|
if isinstance(current_time, datetime):
|
||||||
|
timestamp_seconds = current_time.timestamp()
|
||||||
|
else:
|
||||||
|
timestamp_seconds = current_time.timestamp() if hasattr(current_time, 'timestamp') else current_time
|
||||||
|
|
||||||
|
# Truncate to timeframe boundary
|
||||||
|
candle_start_seconds = int(timestamp_seconds // timeframe_secs) * timeframe_secs
|
||||||
|
candle_start = datetime.fromtimestamp(candle_start_seconds)
|
||||||
|
|
||||||
# Get current candle queue
|
# Get current candle queue
|
||||||
candle_queue = self.real_time_data[symbol][timeframe]
|
candle_queue = self.real_time_data[symbol][timeframe]
|
||||||
@ -2083,4 +2100,363 @@ class DataProvider:
|
|||||||
'distribution_stats': self.distribution_stats.copy(),
|
'distribution_stats': self.distribution_stats.copy(),
|
||||||
'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()},
|
'buffer_sizes': {symbol: len(buffer) for symbol, buffer in self.tick_buffers.items()},
|
||||||
'tick_aggregator': aggregator_stats
|
'tick_aggregator': aggregator_stats
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def update_bom_cache(self, symbol: str, bom_features: List[float], cob_integration=None):
|
||||||
|
"""
|
||||||
|
Update BOM cache with latest features for a symbol
|
||||||
|
|
||||||
|
Args:
|
||||||
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
||||||
|
bom_features: List of BOM features (should be 120 features)
|
||||||
|
cob_integration: Optional COB integration instance for real BOM data
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
current_time = datetime.now()
|
||||||
|
|
||||||
|
# Ensure we have exactly 120 features
|
||||||
|
if len(bom_features) != self.bom_feature_count:
|
||||||
|
if len(bom_features) > self.bom_feature_count:
|
||||||
|
bom_features = bom_features[:self.bom_feature_count]
|
||||||
|
else:
|
||||||
|
bom_features.extend([0.0] * (self.bom_feature_count - len(bom_features)))
|
||||||
|
|
||||||
|
# Convert to numpy array for efficient storage
|
||||||
|
bom_array = np.array(bom_features, dtype=np.float32)
|
||||||
|
|
||||||
|
# Add timestamp and features to cache
|
||||||
|
with self.data_lock:
|
||||||
|
self.bom_data_cache[symbol].append((current_time, bom_array))
|
||||||
|
|
||||||
|
logger.debug(f"Updated BOM cache for {symbol}: {len(self.bom_data_cache[symbol])} timestamps cached")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error updating BOM cache for {symbol}: {e}")
|
||||||
|
|
||||||
|
def get_bom_matrix_for_cnn(self, symbol: str, sequence_length: int = 50) -> Optional[np.ndarray]:
|
||||||
|
"""
|
||||||
|
Get BOM matrix for CNN input from cached 1s data
|
||||||
|
|
||||||
|
Args:
|
||||||
|
symbol: Trading symbol (e.g., 'ETH/USDT')
|
||||||
|
sequence_length: Required sequence length (default 50)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.ndarray: BOM matrix of shape (sequence_length, 120) or None if insufficient data
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
with self.data_lock:
|
||||||
|
if symbol not in self.bom_data_cache or len(self.bom_data_cache[symbol]) == 0:
|
||||||
|
logger.warning(f"No BOM data cached for {symbol}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Get recent data
|
||||||
|
cached_data = list(self.bom_data_cache[symbol])
|
||||||
|
|
||||||
|
if len(cached_data) < sequence_length:
|
||||||
|
logger.warning(f"Insufficient BOM data for {symbol}: {len(cached_data)} < {sequence_length}")
|
||||||
|
# Pad with zeros if we don't have enough data
|
||||||
|
bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32)
|
||||||
|
|
||||||
|
# Fill available data at the end
|
||||||
|
for i, (timestamp, features) in enumerate(cached_data):
|
||||||
|
if i < sequence_length:
|
||||||
|
bom_matrix[sequence_length - len(cached_data) + i] = features
|
||||||
|
|
||||||
|
return bom_matrix
|
||||||
|
|
||||||
|
# Take the most recent sequence_length samples
|
||||||
|
recent_data = cached_data[-sequence_length:]
|
||||||
|
|
||||||
|
# Create matrix
|
||||||
|
bom_matrix = np.zeros((sequence_length, self.bom_feature_count), dtype=np.float32)
|
||||||
|
for i, (timestamp, features) in enumerate(recent_data):
|
||||||
|
bom_matrix[i] = features
|
||||||
|
|
||||||
|
logger.debug(f"Retrieved BOM matrix for {symbol}: shape={bom_matrix.shape}")
|
||||||
|
return bom_matrix
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting BOM matrix for {symbol}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def generate_synthetic_bom_features(self, symbol: str) -> List[float]:
|
||||||
|
"""
|
||||||
|
Generate synthetic BOM features when real COB data is not available
|
||||||
|
|
||||||
|
This creates realistic-looking order book features based on current market data
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
features = []
|
||||||
|
|
||||||
|
# Get current price for context
|
||||||
|
current_price = self.get_current_price(symbol)
|
||||||
|
if current_price is None:
|
||||||
|
current_price = 3000.0 # Fallback price
|
||||||
|
|
||||||
|
# === 1. CONSOLIDATED ORDER BOOK DATA (40 features) ===
|
||||||
|
# Top 10 bid levels (price offset + volume)
|
||||||
|
for i in range(10):
|
||||||
|
price_offset = -0.001 * (i + 1) * (1 + np.random.normal(0, 0.1)) # Negative for bids
|
||||||
|
volume_normalized = np.random.exponential(0.5) * (1.0 - i * 0.1) # Decreasing with depth
|
||||||
|
features.extend([price_offset, volume_normalized])
|
||||||
|
|
||||||
|
# Top 10 ask levels (price offset + volume)
|
||||||
|
for i in range(10):
|
||||||
|
price_offset = 0.001 * (i + 1) * (1 + np.random.normal(0, 0.1)) # Positive for asks
|
||||||
|
volume_normalized = np.random.exponential(0.5) * (1.0 - i * 0.1) # Decreasing with depth
|
||||||
|
features.extend([price_offset, volume_normalized])
|
||||||
|
|
||||||
|
# === 2. VOLUME PROFILE FEATURES (30 features) ===
|
||||||
|
# Top 10 volume levels (buy%, sell%, total volume)
|
||||||
|
for i in range(10):
|
||||||
|
buy_percent = 0.3 + np.random.normal(0, 0.2) # Around 30-70% buy
|
||||||
|
buy_percent = max(0.0, min(1.0, buy_percent))
|
||||||
|
sell_percent = 1.0 - buy_percent
|
||||||
|
total_volume = np.random.exponential(1.0) * (1.0 - i * 0.05)
|
||||||
|
features.extend([buy_percent, sell_percent, total_volume])
|
||||||
|
|
||||||
|
# === 3. ORDER FLOW INTENSITY (25 features) ===
|
||||||
|
# Aggressive order flow
|
||||||
|
features.extend([
|
||||||
|
0.5 + np.random.normal(0, 0.1), # Aggressive buy ratio
|
||||||
|
0.5 + np.random.normal(0, 0.1), # Aggressive sell ratio
|
||||||
|
0.4 + np.random.normal(0, 0.1), # Buy volume ratio
|
||||||
|
0.4 + np.random.normal(0, 0.1), # Sell volume ratio
|
||||||
|
np.random.exponential(100), # Avg aggressive buy size
|
||||||
|
np.random.exponential(100), # Avg aggressive sell size
|
||||||
|
])
|
||||||
|
|
||||||
|
# Block trade detection
|
||||||
|
features.extend([
|
||||||
|
0.1 + np.random.exponential(0.05), # Large trade ratio
|
||||||
|
0.2 + np.random.exponential(0.1), # Large trade volume ratio
|
||||||
|
np.random.exponential(1000), # Avg large trade size
|
||||||
|
])
|
||||||
|
|
||||||
|
# Flow velocity metrics
|
||||||
|
features.extend([
|
||||||
|
1.0 + np.random.normal(0, 0.2), # Avg time delta
|
||||||
|
0.1 + np.random.exponential(0.05), # Time velocity variance
|
||||||
|
0.5 + np.random.normal(0, 0.1), # Trade clustering
|
||||||
|
])
|
||||||
|
|
||||||
|
# Institutional activity indicators
|
||||||
|
features.extend([
|
||||||
|
0.05 + np.random.exponential(0.02), # Iceberg detection
|
||||||
|
0.3 + np.random.normal(0, 0.1), # Hidden order ratio
|
||||||
|
0.2 + np.random.normal(0, 0.05), # Smart money flow
|
||||||
|
0.1 + np.random.exponential(0.03), # Algorithmic activity
|
||||||
|
])
|
||||||
|
|
||||||
|
# Market maker behavior
|
||||||
|
features.extend([
|
||||||
|
0.6 + np.random.normal(0, 0.1), # MM provision ratio
|
||||||
|
0.4 + np.random.normal(0, 0.1), # MM take ratio
|
||||||
|
0.02 + np.random.normal(0, 0.005), # Spread tightening
|
||||||
|
1.0 + np.random.normal(0, 0.2), # Quote update frequency
|
||||||
|
0.8 + np.random.normal(0, 0.1), # Quote stability
|
||||||
|
])
|
||||||
|
|
||||||
|
# === 4. MARKET MICROSTRUCTURE SIGNALS (25 features) ===
|
||||||
|
# Order book pressure
|
||||||
|
features.extend([
|
||||||
|
0.5 + np.random.normal(0, 0.1), # Bid pressure
|
||||||
|
0.5 + np.random.normal(0, 0.1), # Ask pressure
|
||||||
|
0.0 + np.random.normal(0, 0.05), # Pressure imbalance
|
||||||
|
1.0 + np.random.normal(0, 0.2), # Pressure intensity
|
||||||
|
0.5 + np.random.normal(0, 0.1), # Depth stability
|
||||||
|
])
|
||||||
|
|
||||||
|
# Price level concentration
|
||||||
|
features.extend([
|
||||||
|
0.3 + np.random.normal(0, 0.1), # Bid concentration
|
||||||
|
0.3 + np.random.normal(0, 0.1), # Ask concentration
|
||||||
|
0.8 + np.random.normal(0, 0.1), # Top level dominance
|
||||||
|
0.2 + np.random.normal(0, 0.05), # Fragmentation index
|
||||||
|
0.6 + np.random.normal(0, 0.1), # Liquidity clustering
|
||||||
|
])
|
||||||
|
|
||||||
|
# Temporal dynamics
|
||||||
|
features.extend([
|
||||||
|
0.1 + np.random.normal(0, 0.02), # Volatility factor
|
||||||
|
1.0 + np.random.normal(0, 0.1), # Momentum factor
|
||||||
|
0.0 + np.random.normal(0, 0.05), # Mean reversion
|
||||||
|
0.5 + np.random.normal(0, 0.1), # Trend alignment
|
||||||
|
0.8 + np.random.normal(0, 0.1), # Pattern consistency
|
||||||
|
])
|
||||||
|
|
||||||
|
# Exchange-specific patterns
|
||||||
|
features.extend([
|
||||||
|
0.4 + np.random.normal(0, 0.1), # Cross-exchange correlation
|
||||||
|
0.3 + np.random.normal(0, 0.1), # Exchange arbitrage
|
||||||
|
0.2 + np.random.normal(0, 0.05), # Latency patterns
|
||||||
|
0.8 + np.random.normal(0, 0.1), # Sync quality
|
||||||
|
0.6 + np.random.normal(0, 0.1), # Data freshness
|
||||||
|
])
|
||||||
|
|
||||||
|
# Ensure exactly 120 features
|
||||||
|
if len(features) > 120:
|
||||||
|
features = features[:120]
|
||||||
|
elif len(features) < 120:
|
||||||
|
features.extend([0.0] * (120 - len(features)))
|
||||||
|
|
||||||
|
# Clamp all values to reasonable ranges
|
||||||
|
features = [max(-5.0, min(5.0, f)) for f in features]
|
||||||
|
|
||||||
|
return features
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating synthetic BOM features for {symbol}: {e}")
|
||||||
|
return [0.0] * 120
|
||||||
|
|
||||||
|
def start_bom_cache_updates(self, cob_integration=None):
|
||||||
|
"""
|
||||||
|
Start background updates of BOM cache every second
|
||||||
|
|
||||||
|
Args:
|
||||||
|
cob_integration: Optional COB integration instance for real data
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
def update_loop():
|
||||||
|
while self.is_streaming:
|
||||||
|
try:
|
||||||
|
for symbol in self.symbols:
|
||||||
|
if cob_integration:
|
||||||
|
# Try to get real BOM features from COB integration
|
||||||
|
try:
|
||||||
|
bom_features = self._extract_real_bom_features(symbol, cob_integration)
|
||||||
|
if bom_features:
|
||||||
|
self.update_bom_cache(symbol, bom_features, cob_integration)
|
||||||
|
else:
|
||||||
|
# Fallback to synthetic
|
||||||
|
synthetic_features = self.generate_synthetic_bom_features(symbol)
|
||||||
|
self.update_bom_cache(symbol, synthetic_features)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error getting real BOM features for {symbol}: {e}")
|
||||||
|
synthetic_features = self.generate_synthetic_bom_features(symbol)
|
||||||
|
self.update_bom_cache(symbol, synthetic_features)
|
||||||
|
else:
|
||||||
|
# Generate synthetic BOM features
|
||||||
|
synthetic_features = self.generate_synthetic_bom_features(symbol)
|
||||||
|
self.update_bom_cache(symbol, synthetic_features)
|
||||||
|
|
||||||
|
time.sleep(1.0) # Update every second
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in BOM cache update loop: {e}")
|
||||||
|
time.sleep(5.0) # Wait longer on error
|
||||||
|
|
||||||
|
# Start background thread
|
||||||
|
bom_thread = Thread(target=update_loop, daemon=True)
|
||||||
|
bom_thread.start()
|
||||||
|
|
||||||
|
logger.info("Started BOM cache updates (1s resolution)")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error starting BOM cache updates: {e}")
|
||||||
|
|
||||||
|
def _extract_real_bom_features(self, symbol: str, cob_integration) -> Optional[List[float]]:
|
||||||
|
"""Extract real BOM features from COB integration"""
|
||||||
|
try:
|
||||||
|
features = []
|
||||||
|
|
||||||
|
# Get consolidated order book
|
||||||
|
if hasattr(cob_integration, 'get_consolidated_orderbook'):
|
||||||
|
cob_snapshot = cob_integration.get_consolidated_orderbook(symbol)
|
||||||
|
if cob_snapshot:
|
||||||
|
# Extract order book features (40 features)
|
||||||
|
features.extend(self._extract_orderbook_features(cob_snapshot))
|
||||||
|
else:
|
||||||
|
features.extend([0.0] * 40)
|
||||||
|
else:
|
||||||
|
features.extend([0.0] * 40)
|
||||||
|
|
||||||
|
# Get volume profile features (30 features)
|
||||||
|
if hasattr(cob_integration, 'get_session_volume_profile'):
|
||||||
|
volume_profile = cob_integration.get_session_volume_profile(symbol)
|
||||||
|
if volume_profile:
|
||||||
|
features.extend(self._extract_volume_profile_features(volume_profile))
|
||||||
|
else:
|
||||||
|
features.extend([0.0] * 30)
|
||||||
|
else:
|
||||||
|
features.extend([0.0] * 30)
|
||||||
|
|
||||||
|
# Add flow and microstructure features (50 features)
|
||||||
|
features.extend(self._extract_flow_microstructure_features(symbol, cob_integration))
|
||||||
|
|
||||||
|
# Ensure exactly 120 features
|
||||||
|
if len(features) > 120:
|
||||||
|
features = features[:120]
|
||||||
|
elif len(features) < 120:
|
||||||
|
features.extend([0.0] * (120 - len(features)))
|
||||||
|
|
||||||
|
return features
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error extracting real BOM features for {symbol}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _extract_orderbook_features(self, cob_snapshot) -> List[float]:
|
||||||
|
"""Extract order book features from COB snapshot"""
|
||||||
|
features = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Top 10 bid levels
|
||||||
|
for i in range(10):
|
||||||
|
if i < len(cob_snapshot.consolidated_bids):
|
||||||
|
level = cob_snapshot.consolidated_bids[i]
|
||||||
|
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
|
||||||
|
volume_normalized = level.total_volume_usd / 1000000
|
||||||
|
features.extend([price_offset, volume_normalized])
|
||||||
|
else:
|
||||||
|
features.extend([0.0, 0.0])
|
||||||
|
|
||||||
|
# Top 10 ask levels
|
||||||
|
for i in range(10):
|
||||||
|
if i < len(cob_snapshot.consolidated_asks):
|
||||||
|
level = cob_snapshot.consolidated_asks[i]
|
||||||
|
price_offset = (level.price - cob_snapshot.volume_weighted_mid) / cob_snapshot.volume_weighted_mid
|
||||||
|
volume_normalized = level.total_volume_usd / 1000000
|
||||||
|
features.extend([price_offset, volume_normalized])
|
||||||
|
else:
|
||||||
|
features.extend([0.0, 0.0])
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error extracting order book features: {e}")
|
||||||
|
features = [0.0] * 40
|
||||||
|
|
||||||
|
return features[:40]
|
||||||
|
|
||||||
|
def _extract_volume_profile_features(self, volume_profile) -> List[float]:
|
||||||
|
"""Extract volume profile features"""
|
||||||
|
features = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
if 'data' in volume_profile:
|
||||||
|
svp_data = volume_profile['data']
|
||||||
|
top_levels = sorted(svp_data, key=lambda x: x.get('total_volume', 0), reverse=True)[:10]
|
||||||
|
|
||||||
|
for level in top_levels:
|
||||||
|
buy_percent = level.get('buy_percent', 50.0) / 100.0
|
||||||
|
sell_percent = level.get('sell_percent', 50.0) / 100.0
|
||||||
|
total_volume = level.get('total_volume', 0.0) / 1000000
|
||||||
|
features.extend([buy_percent, sell_percent, total_volume])
|
||||||
|
|
||||||
|
# Pad to 30 features
|
||||||
|
while len(features) < 30:
|
||||||
|
features.extend([0.5, 0.5, 0.0])
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error extracting volume profile features: {e}")
|
||||||
|
features = [0.0] * 30
|
||||||
|
|
||||||
|
return features[:30]
|
||||||
|
|
||||||
|
def _extract_flow_microstructure_features(self, symbol: str, cob_integration) -> List[float]:
|
||||||
|
"""Extract flow and microstructure features"""
|
||||||
|
try:
|
||||||
|
# For now, return synthetic features since full implementation would be complex
|
||||||
|
return self.generate_synthetic_bom_features(symbol)[70:] # Last 50 features
|
||||||
|
except:
|
||||||
|
return [0.0] * 50
|
@ -197,6 +197,14 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
|
|||||||
self.latest_cob_state: Dict[str, np.ndarray] = {}
|
self.latest_cob_state: Dict[str, np.ndarray] = {}
|
||||||
self.cob_feature_history: Dict[str, deque] = {symbol: deque(maxlen=100) for symbol in self.symbols}
|
self.cob_feature_history: Dict[str, deque] = {symbol: deque(maxlen=100) for symbol in self.symbols}
|
||||||
|
|
||||||
|
# Start BOM cache updates in data provider
|
||||||
|
if hasattr(self.data_provider, 'start_bom_cache_updates'):
|
||||||
|
try:
|
||||||
|
self.data_provider.start_bom_cache_updates(self.cob_integration)
|
||||||
|
logger.info("Started BOM cache updates in data provider")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to start BOM cache updates: {e}")
|
||||||
|
|
||||||
logger.info("COB Integration: Deferred initialization to prevent sync/async conflicts")
|
logger.info("COB Integration: Deferred initialization to prevent sync/async conflicts")
|
||||||
|
|
||||||
# Williams integration
|
# Williams integration
|
||||||
@ -759,70 +767,56 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
|
|||||||
|
|
||||||
def _get_bom_matrix_for_cnn(self, symbol: str) -> Optional[np.ndarray]:
|
def _get_bom_matrix_for_cnn(self, symbol: str) -> Optional[np.ndarray]:
|
||||||
"""
|
"""
|
||||||
Generate BOM (Book of Market) matrix for CNN input
|
Get cached BOM (Book of Market) matrix for CNN input from data provider
|
||||||
|
|
||||||
BOM Matrix contains:
|
Uses 1s cached BOM data from the data provider for proper temporal analysis
|
||||||
- Order book depth (20 levels bid/ask)
|
|
||||||
- Volume profile distribution
|
|
||||||
- Order flow intensity patterns
|
|
||||||
- Market microstructure signals
|
|
||||||
- Exchange-specific liquidity data
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
np.ndarray: BOM matrix of shape (sequence_length, bom_features)
|
np.ndarray: BOM matrix of shape (sequence_length, 120) from cached 1s data
|
||||||
where bom_features typically = 120 features
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
sequence_length = 50 # Match standard CNN sequence length
|
sequence_length = 50 # Match standard CNN sequence length
|
||||||
bom_features = []
|
|
||||||
|
|
||||||
# === 1. CONSOLIDATED ORDER BOOK DATA ===
|
# Get cached BOM matrix from data provider
|
||||||
cob_features = self._get_cob_bom_features(symbol)
|
if hasattr(self.data_provider, 'get_bom_matrix_for_cnn'):
|
||||||
if cob_features:
|
bom_matrix = self.data_provider.get_bom_matrix_for_cnn(symbol, sequence_length)
|
||||||
bom_features.extend(cob_features) # ~40 features
|
if bom_matrix is not None:
|
||||||
else:
|
logger.debug(f"Retrieved cached BOM matrix for {symbol}: shape={bom_matrix.shape}")
|
||||||
bom_features.extend([0.0] * 40)
|
return bom_matrix
|
||||||
|
|
||||||
# === 2. VOLUME PROFILE FEATURES ===
|
# Fallback to generating synthetic BOM matrix if no cache available
|
||||||
volume_profile_features = self._get_volume_profile_bom_features(symbol)
|
logger.warning(f"No cached BOM data available for {symbol}, generating synthetic")
|
||||||
if volume_profile_features:
|
return self._generate_fallback_bom_matrix(symbol, sequence_length)
|
||||||
bom_features.extend(volume_profile_features) # ~30 features
|
|
||||||
else:
|
|
||||||
bom_features.extend([0.0] * 30)
|
|
||||||
|
|
||||||
# === 3. ORDER FLOW INTENSITY ===
|
|
||||||
flow_intensity_features = self._get_flow_intensity_bom_features(symbol)
|
|
||||||
if flow_intensity_features:
|
|
||||||
bom_features.extend(flow_intensity_features) # ~25 features
|
|
||||||
else:
|
|
||||||
bom_features.extend([0.0] * 25)
|
|
||||||
|
|
||||||
# === 4. MARKET MICROSTRUCTURE SIGNALS ===
|
|
||||||
microstructure_features = self._get_microstructure_bom_features(symbol)
|
|
||||||
if microstructure_features:
|
|
||||||
bom_features.extend(microstructure_features) # ~25 features
|
|
||||||
else:
|
|
||||||
bom_features.extend([0.0] * 25)
|
|
||||||
|
|
||||||
# Pad or trim to exactly 120 features
|
|
||||||
if len(bom_features) > 120:
|
|
||||||
bom_features = bom_features[:120]
|
|
||||||
elif len(bom_features) < 120:
|
|
||||||
bom_features.extend([0.0] * (120 - len(bom_features)))
|
|
||||||
|
|
||||||
# Create time series matrix by repeating features across sequence
|
|
||||||
# In real implementation, you might want historical BOM data
|
|
||||||
bom_matrix = np.tile(bom_features, (sequence_length, 1))
|
|
||||||
|
|
||||||
# Add temporal dynamics (simulate order book changes over time)
|
|
||||||
bom_matrix = self._add_temporal_dynamics_to_bom(bom_matrix, symbol)
|
|
||||||
|
|
||||||
logger.debug(f"Generated BOM matrix for {symbol}: shape={bom_matrix.shape}")
|
|
||||||
return bom_matrix.astype(np.float32)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Error generating BOM matrix for {symbol}: {e}")
|
logger.warning(f"Error getting BOM matrix for {symbol}: {e}")
|
||||||
return None
|
return self._generate_fallback_bom_matrix(symbol, sequence_length)
|
||||||
|
|
||||||
|
def _generate_fallback_bom_matrix(self, symbol: str, sequence_length: int) -> np.ndarray:
|
||||||
|
"""Generate fallback BOM matrix when cache is not available"""
|
||||||
|
try:
|
||||||
|
# Generate synthetic BOM features for current timestamp
|
||||||
|
if hasattr(self.data_provider, 'generate_synthetic_bom_features'):
|
||||||
|
current_features = self.data_provider.generate_synthetic_bom_features(symbol)
|
||||||
|
else:
|
||||||
|
current_features = [0.0] * 120
|
||||||
|
|
||||||
|
# Create temporal variations for the sequence
|
||||||
|
bom_matrix = np.zeros((sequence_length, 120), dtype=np.float32)
|
||||||
|
|
||||||
|
for i in range(sequence_length):
|
||||||
|
# Add small random variations to simulate temporal changes
|
||||||
|
variation_factor = 0.95 + 0.1 * np.random.random() # 5% variation
|
||||||
|
varied_features = [f * variation_factor for f in current_features]
|
||||||
|
bom_matrix[i] = np.array(varied_features, dtype=np.float32)
|
||||||
|
|
||||||
|
logger.debug(f"Generated fallback BOM matrix for {symbol}: shape={bom_matrix.shape}")
|
||||||
|
return bom_matrix
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating fallback BOM matrix for {symbol}: {e}")
|
||||||
|
# Return zeros as absolute fallback
|
||||||
|
return np.zeros((sequence_length, 120), dtype=np.float32)
|
||||||
|
|
||||||
def _get_cob_bom_features(self, symbol: str) -> Optional[List[float]]:
|
def _get_cob_bom_features(self, symbol: str) -> Optional[List[float]]:
|
||||||
"""Extract COB features for BOM matrix (40 features)"""
|
"""Extract COB features for BOM matrix (40 features)"""
|
||||||
@ -1203,33 +1197,77 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
|
|||||||
Combine traditional market features with BOM matrix features
|
Combine traditional market features with BOM matrix features
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
market_matrix: Traditional market data features (sequence_length, market_features)
|
market_matrix: Traditional market data features (timeframes, sequence_length, market_features) - 3D
|
||||||
bom_matrix: BOM matrix features (sequence_length, bom_features)
|
bom_matrix: BOM matrix features (sequence_length, bom_features) - 2D
|
||||||
symbol: Trading symbol
|
symbol: Trading symbol
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Combined feature matrix (sequence_length, market_features + bom_features)
|
Combined feature matrix reshaped for CNN input
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# Ensure both matrices have the same sequence length
|
logger.debug(f"Combining features for {symbol}: market={market_matrix.shape}, bom={bom_matrix.shape}")
|
||||||
min_length = min(market_matrix.shape[0], bom_matrix.shape[0])
|
|
||||||
|
|
||||||
market_trimmed = market_matrix[:min_length]
|
# Handle dimensional mismatch
|
||||||
bom_trimmed = bom_matrix[:min_length]
|
if market_matrix.ndim == 3 and bom_matrix.ndim == 2:
|
||||||
|
# Market matrix is (timeframes, sequence_length, features)
|
||||||
# Combine horizontally
|
# BOM matrix is (sequence_length, bom_features)
|
||||||
combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1)
|
|
||||||
|
# Reshape market matrix to 2D by flattening timeframes dimension
|
||||||
logger.debug(f"Combined market and BOM features for {symbol}: "
|
timeframes, sequence_length, market_features = market_matrix.shape
|
||||||
f"market={market_trimmed.shape}, bom={bom_trimmed.shape}, "
|
|
||||||
f"combined={combined_matrix.shape}")
|
# Option 1: Take the last timeframe (most recent data)
|
||||||
|
market_2d = market_matrix[-1] # Shape: (sequence_length, market_features)
|
||||||
return combined_matrix.astype(np.float32)
|
|
||||||
|
# Ensure sequence lengths match
|
||||||
|
min_length = min(market_2d.shape[0], bom_matrix.shape[0])
|
||||||
|
market_trimmed = market_2d[:min_length]
|
||||||
|
bom_trimmed = bom_matrix[:min_length]
|
||||||
|
|
||||||
|
# Combine horizontally
|
||||||
|
combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1)
|
||||||
|
|
||||||
|
logger.debug(f"Combined features for {symbol}: "
|
||||||
|
f"market_2d={market_trimmed.shape}, bom={bom_trimmed.shape}, "
|
||||||
|
f"combined={combined_matrix.shape}")
|
||||||
|
|
||||||
|
return combined_matrix.astype(np.float32)
|
||||||
|
|
||||||
|
elif market_matrix.ndim == 2 and bom_matrix.ndim == 2:
|
||||||
|
# Both are 2D - can combine directly
|
||||||
|
min_length = min(market_matrix.shape[0], bom_matrix.shape[0])
|
||||||
|
market_trimmed = market_matrix[:min_length]
|
||||||
|
bom_trimmed = bom_matrix[:min_length]
|
||||||
|
|
||||||
|
combined_matrix = np.concatenate([market_trimmed, bom_trimmed], axis=1)
|
||||||
|
|
||||||
|
logger.debug(f"Combined 2D features for {symbol}: "
|
||||||
|
f"market={market_trimmed.shape}, bom={bom_trimmed.shape}, "
|
||||||
|
f"combined={combined_matrix.shape}")
|
||||||
|
|
||||||
|
return combined_matrix.astype(np.float32)
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.warning(f"Unsupported matrix dimensions for {symbol}: "
|
||||||
|
f"market={market_matrix.shape}, bom={bom_matrix.shape}")
|
||||||
|
# Fallback: reshape market matrix to 2D if needed
|
||||||
|
if market_matrix.ndim == 3:
|
||||||
|
market_2d = market_matrix.reshape(-1, market_matrix.shape[-1])
|
||||||
|
else:
|
||||||
|
market_2d = market_matrix
|
||||||
|
|
||||||
|
return market_2d.astype(np.float32)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error combining market and BOM features for {symbol}: {e}")
|
logger.error(f"Error combining market and BOM features for {symbol}: {e}")
|
||||||
# Fallback to market features only
|
# Fallback to reshaped market features only
|
||||||
return market_matrix
|
try:
|
||||||
|
if market_matrix.ndim == 3:
|
||||||
|
return market_matrix[-1].astype(np.float32) # Last timeframe
|
||||||
|
else:
|
||||||
|
return market_matrix.astype(np.float32)
|
||||||
|
except:
|
||||||
|
logger.error(f"Fallback failed for {symbol}, returning zeros")
|
||||||
|
return np.zeros((50, 5), dtype=np.float32) # Basic fallback
|
||||||
|
|
||||||
def _get_latest_price_from_universal(self, symbol: str, timeframe: str, universal_stream: UniversalDataStream) -> Optional[float]:
|
def _get_latest_price_from_universal(self, symbol: str, timeframe: str, universal_stream: UniversalDataStream) -> Optional[float]:
|
||||||
"""Get latest price for symbol and timeframe from universal data stream"""
|
"""Get latest price for symbol and timeframe from universal data stream"""
|
||||||
@ -1351,20 +1389,55 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
|
|||||||
# Get model prediction
|
# Get model prediction
|
||||||
prediction_result = model.predict(feature_matrix)
|
prediction_result = model.predict(feature_matrix)
|
||||||
|
|
||||||
# Extract predictions (action probabilities)
|
# Extract predictions (action probabilities) - ensure proper array handling
|
||||||
if isinstance(prediction_result, dict):
|
if isinstance(prediction_result, dict):
|
||||||
predictions = prediction_result.get('probabilities', np.array([0.33, 0.33, 0.34]))
|
# Get probabilities as flat array
|
||||||
|
predictions = prediction_result.get('probabilities', [0.33, 0.33, 0.34])
|
||||||
confidence = prediction_result.get('confidence', 0.7)
|
confidence = prediction_result.get('confidence', 0.7)
|
||||||
# Ensure predictions is a flat numpy array
|
|
||||||
if isinstance(predictions, (list, tuple)):
|
# Convert predictions to numpy array first
|
||||||
predictions = np.array(predictions, dtype=np.float32)
|
if isinstance(predictions, np.ndarray):
|
||||||
predictions = np.append(predictions.flatten(), confidence)
|
predictions_array = predictions.flatten()
|
||||||
|
elif isinstance(predictions, (list, tuple)):
|
||||||
|
predictions_array = np.array(predictions, dtype=np.float32).flatten()
|
||||||
|
else:
|
||||||
|
predictions_array = np.array([float(predictions)], dtype=np.float32)
|
||||||
|
|
||||||
|
# Create final predictions array with confidence
|
||||||
|
# Use safe tensor conversion to avoid scalar conversion errors
|
||||||
|
confidence_scalar = self._safe_tensor_to_scalar(confidence, default_value=0.7)
|
||||||
|
|
||||||
|
# Combine predictions and confidence as separate elements
|
||||||
|
predictions = np.concatenate([
|
||||||
|
predictions_array,
|
||||||
|
np.array([confidence_scalar], dtype=np.float32)
|
||||||
|
])
|
||||||
|
elif isinstance(prediction_result, tuple) and len(prediction_result) == 2:
|
||||||
|
# Handle (pred_class, pred_proba) tuple from CNN models
|
||||||
|
pred_class, pred_proba = prediction_result
|
||||||
|
|
||||||
|
# Flatten and process the probability array
|
||||||
|
if isinstance(pred_proba, np.ndarray):
|
||||||
|
if pred_proba.ndim > 1:
|
||||||
|
# Handle 2D arrays like [[0.1, 0.2, 0.7]]
|
||||||
|
pred_proba_flat = pred_proba.flatten()
|
||||||
|
else:
|
||||||
|
# Already 1D
|
||||||
|
pred_proba_flat = pred_proba
|
||||||
|
|
||||||
|
# Use the probability values as the predictions array
|
||||||
|
predictions = pred_proba_flat.astype(np.float32)
|
||||||
|
else:
|
||||||
|
# Fallback: use class prediction only
|
||||||
|
predictions = np.array([float(pred_class)], dtype=np.float32)
|
||||||
else:
|
else:
|
||||||
# Ensure prediction_result is a flat numpy array
|
# Handle direct prediction result
|
||||||
if isinstance(prediction_result, (list, tuple)):
|
if isinstance(prediction_result, np.ndarray):
|
||||||
|
predictions = prediction_result.flatten()
|
||||||
|
elif isinstance(prediction_result, (list, tuple)):
|
||||||
predictions = np.array(prediction_result, dtype=np.float32).flatten()
|
predictions = np.array(prediction_result, dtype=np.float32).flatten()
|
||||||
else:
|
else:
|
||||||
predictions = np.array(prediction_result).flatten()
|
predictions = np.array([float(prediction_result)], dtype=np.float32)
|
||||||
|
|
||||||
# Extract hidden features if model supports it
|
# Extract hidden features if model supports it
|
||||||
hidden_features = None
|
hidden_features = None
|
||||||
@ -4652,4 +4725,38 @@ class EnhancedTradingOrchestrator(TradingOrchestrator):
|
|||||||
'price_to_pivot_ratio': 1.0,
|
'price_to_pivot_ratio': 1.0,
|
||||||
'volume_strength': 1.0,
|
'volume_strength': 1.0,
|
||||||
'pivot_strength': 0.5
|
'pivot_strength': 0.5
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Helper function to safely extract scalar values from tensors
|
||||||
|
def _safe_tensor_to_scalar(self, tensor_value, default_value: float = 0.7) -> float:
|
||||||
|
"""
|
||||||
|
Safely convert tensor/array values to Python scalar floats
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tensor_value: Input tensor, array, or scalar value
|
||||||
|
default_value: Default value to return if conversion fails
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Python float scalar value
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if hasattr(tensor_value, 'item'):
|
||||||
|
# PyTorch tensor - handle different shapes
|
||||||
|
if tensor_value.numel() == 1:
|
||||||
|
return float(tensor_value.item())
|
||||||
|
else:
|
||||||
|
return float(tensor_value.flatten()[0].item())
|
||||||
|
elif isinstance(tensor_value, np.ndarray):
|
||||||
|
# NumPy array - handle different shapes
|
||||||
|
if tensor_value.ndim == 0:
|
||||||
|
return float(tensor_value.item())
|
||||||
|
elif tensor_value.size == 1:
|
||||||
|
return float(tensor_value.flatten()[0])
|
||||||
|
else:
|
||||||
|
return float(tensor_value.flat[0])
|
||||||
|
else:
|
||||||
|
# Already a scalar value
|
||||||
|
return float(tensor_value)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error converting tensor to scalar, using default {default_value}: {e}")
|
||||||
|
return default_value
|
@ -25,9 +25,11 @@ import logging
|
|||||||
import time
|
import time
|
||||||
try:
|
try:
|
||||||
import websockets
|
import websockets
|
||||||
|
from websockets.client import connect as websockets_connect
|
||||||
except ImportError:
|
except ImportError:
|
||||||
# Fallback for environments where websockets is not available
|
# Fallback for environments where websockets is not available
|
||||||
websockets = None
|
websockets = None
|
||||||
|
websockets_connect = None
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
@ -465,9 +467,9 @@ class MultiExchangeCOBProvider:
|
|||||||
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms"
|
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@depth20@100ms"
|
||||||
logger.info(f"Connecting to Binance WebSocket: {ws_url}")
|
logger.info(f"Connecting to Binance WebSocket: {ws_url}")
|
||||||
|
|
||||||
if websockets is None:
|
if websockets is None or websockets_connect is None:
|
||||||
raise ImportError("websockets module not available")
|
raise ImportError("websockets module not available")
|
||||||
async with websockets.connect(ws_url) as websocket:
|
async with websockets_connect(ws_url) as websocket:
|
||||||
self.exchange_order_books[symbol]['binance']['connected'] = True
|
self.exchange_order_books[symbol]['binance']['connected'] = True
|
||||||
logger.info(f"Connected to Binance order book stream for {symbol}")
|
logger.info(f"Connected to Binance order book stream for {symbol}")
|
||||||
|
|
||||||
@ -696,9 +698,9 @@ class MultiExchangeCOBProvider:
|
|||||||
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade"
|
ws_url = f"{config.websocket_url}{config.symbols_mapping[symbol].lower()}@trade"
|
||||||
logger.info(f"Connecting to Binance trade stream: {ws_url}")
|
logger.info(f"Connecting to Binance trade stream: {ws_url}")
|
||||||
|
|
||||||
if websockets is None:
|
if websockets is None or websockets_connect is None:
|
||||||
raise ImportError("websockets module not available")
|
raise ImportError("websockets module not available")
|
||||||
async with websockets.connect(ws_url) as websocket:
|
async with websockets_connect(ws_url) as websocket:
|
||||||
logger.info(f"Connected to Binance trade stream for {symbol}")
|
logger.info(f"Connected to Binance trade stream for {symbol}")
|
||||||
|
|
||||||
async for message in websocket:
|
async for message in websocket:
|
||||||
|
30
main.py
30
main.py
@ -53,6 +53,13 @@ async def run_web_dashboard():
|
|||||||
# Create data provider
|
# Create data provider
|
||||||
data_provider = DataProvider()
|
data_provider = DataProvider()
|
||||||
|
|
||||||
|
# Start real-time streaming for BOM caching
|
||||||
|
try:
|
||||||
|
await data_provider.start_real_time_streaming()
|
||||||
|
logger.info("[SUCCESS] Real-time data streaming started for BOM caching")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[WARNING] Real-time streaming failed: {e}")
|
||||||
|
|
||||||
# Verify data connection
|
# Verify data connection
|
||||||
logger.info("[DATA] Verifying live data connection...")
|
logger.info("[DATA] Verifying live data connection...")
|
||||||
symbol = config.get('symbols', ['ETH/USDT'])[0]
|
symbol = config.get('symbols', ['ETH/USDT'])[0]
|
||||||
@ -116,12 +123,12 @@ async def run_web_dashboard():
|
|||||||
import traceback
|
import traceback
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
|
|
||||||
def start_web_ui():
|
def start_web_ui(port=8051):
|
||||||
"""Start the main TradingDashboard UI in a separate thread"""
|
"""Start the main TradingDashboard UI in a separate thread"""
|
||||||
try:
|
try:
|
||||||
logger.info("=" * 50)
|
logger.info("=" * 50)
|
||||||
logger.info("Starting Main Trading Dashboard UI...")
|
logger.info("Starting Main Trading Dashboard UI...")
|
||||||
logger.info("Trading Dashboard: http://127.0.0.1:8051")
|
logger.info(f"Trading Dashboard: http://127.0.0.1:{port}")
|
||||||
logger.info("COB Integration: ENABLED (Real-time order book visualization)")
|
logger.info("COB Integration: ENABLED (Real-time order book visualization)")
|
||||||
logger.info("=" * 50)
|
logger.info("=" * 50)
|
||||||
|
|
||||||
@ -135,6 +142,19 @@ def start_web_ui():
|
|||||||
config = get_config()
|
config = get_config()
|
||||||
data_provider = DataProvider()
|
data_provider = DataProvider()
|
||||||
|
|
||||||
|
# Start real-time streaming for BOM caching (non-blocking)
|
||||||
|
try:
|
||||||
|
import threading
|
||||||
|
def start_streaming():
|
||||||
|
import asyncio
|
||||||
|
asyncio.run(data_provider.start_real_time_streaming())
|
||||||
|
|
||||||
|
streaming_thread = threading.Thread(target=start_streaming, daemon=True)
|
||||||
|
streaming_thread.start()
|
||||||
|
logger.info("[SUCCESS] Real-time streaming thread started for dashboard")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"[WARNING] Dashboard streaming setup failed: {e}")
|
||||||
|
|
||||||
# Load model registry for enhanced features
|
# Load model registry for enhanced features
|
||||||
try:
|
try:
|
||||||
from models import get_model_registry
|
from models import get_model_registry
|
||||||
@ -163,7 +183,7 @@ def start_web_ui():
|
|||||||
logger.info("Features: Live trading, COB visualization, RL training monitoring, Position management")
|
logger.info("Features: Live trading, COB visualization, RL training monitoring, Position management")
|
||||||
|
|
||||||
# Run the dashboard server (COB integration will start automatically)
|
# Run the dashboard server (COB integration will start automatically)
|
||||||
dashboard.app.run(host='127.0.0.1', port=8051, debug=False, use_reloader=False)
|
dashboard.app.run(host='127.0.0.1', port=port, debug=False, use_reloader=False)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error starting main trading dashboard UI: {e}")
|
logger.error(f"Error starting main trading dashboard UI: {e}")
|
||||||
@ -246,7 +266,7 @@ async def main():
|
|||||||
logger.info("STREAMLINED TRADING SYSTEM - TRAINING + MAIN DASHBOARD")
|
logger.info("STREAMLINED TRADING SYSTEM - TRAINING + MAIN DASHBOARD")
|
||||||
logger.info(f"Primary Symbol: {args.symbol}")
|
logger.info(f"Primary Symbol: {args.symbol}")
|
||||||
logger.info(f"Training Port: {args.port}")
|
logger.info(f"Training Port: {args.port}")
|
||||||
logger.info(f"Main Trading Dashboard: http://127.0.0.1:8051")
|
logger.info(f"Main Trading Dashboard: http://127.0.0.1:{args.port}")
|
||||||
logger.info("2-Action System: BUY/SELL with intelligent position management")
|
logger.info("2-Action System: BUY/SELL with intelligent position management")
|
||||||
logger.info("Always Invested: Learning to spot high risk/reward setups")
|
logger.info("Always Invested: Learning to spot high risk/reward setups")
|
||||||
logger.info("Flow: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution")
|
logger.info("Flow: Data -> COB -> Indicators -> CNN -> RL -> Orchestrator -> Execution")
|
||||||
@ -254,7 +274,7 @@ async def main():
|
|||||||
logger.info("=" * 70)
|
logger.info("=" * 70)
|
||||||
|
|
||||||
# Start main trading dashboard UI in a separate thread
|
# Start main trading dashboard UI in a separate thread
|
||||||
web_thread = Thread(target=start_web_ui, daemon=True)
|
web_thread = Thread(target=lambda: start_web_ui(args.port), daemon=True)
|
||||||
web_thread.start()
|
web_thread.start()
|
||||||
logger.info("Main trading dashboard UI thread started")
|
logger.info("Main trading dashboard UI thread started")
|
||||||
|
|
||||||
|
@ -125,8 +125,16 @@ except ImportError:
|
|||||||
|
|
||||||
outputs = self.model(X)
|
outputs = self.model(X)
|
||||||
probs = F.softmax(outputs, dim=1)
|
probs = F.softmax(outputs, dim=1)
|
||||||
pred_class = torch.argmax(probs, dim=1).numpy()
|
|
||||||
pred_proba = probs.numpy()
|
# Ensure proper tensor conversion to avoid scalar conversion errors
|
||||||
|
pred_class = torch.argmax(probs, dim=1).detach().cpu().numpy()
|
||||||
|
pred_proba = probs.detach().cpu().numpy()
|
||||||
|
|
||||||
|
# Handle single batch case - ensure scalars are properly extracted
|
||||||
|
if pred_class.ndim > 0 and pred_class.size == 1:
|
||||||
|
pred_class = pred_class.item() # Convert to Python scalar
|
||||||
|
if pred_proba.ndim > 1 and pred_proba.shape[0] == 1:
|
||||||
|
pred_proba = pred_proba[0] # Remove batch dimension
|
||||||
|
|
||||||
logger.debug(f"Fallback CNN prediction: class={pred_class}, proba_shape={pred_proba.shape}")
|
logger.debug(f"Fallback CNN prediction: class={pred_class}, proba_shape={pred_proba.shape}")
|
||||||
return pred_class, pred_proba
|
return pred_class, pred_proba
|
||||||
|
Reference in New Issue
Block a user