additional logging for data stream
This commit is contained in:
@@ -15,6 +15,28 @@ from collections import deque
|
||||
import threading
|
||||
import os
|
||||
|
||||
# Set up separate logger for data stream monitor
|
||||
stream_logger = logging.getLogger('data_stream_monitor')
|
||||
stream_logger.setLevel(logging.INFO)
|
||||
|
||||
# Create file handler for data stream logs
|
||||
stream_log_file = os.path.join('logs', 'data_stream_monitor.log')
|
||||
os.makedirs(os.path.dirname(stream_log_file), exist_ok=True)
|
||||
|
||||
stream_handler = logging.FileHandler(stream_log_file)
|
||||
stream_handler.setLevel(logging.INFO)
|
||||
|
||||
# Create formatter
|
||||
stream_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
stream_handler.setFormatter(stream_formatter)
|
||||
|
||||
# Add handler to logger (only if not already added)
|
||||
if not stream_logger.handlers:
|
||||
stream_logger.addHandler(stream_handler)
|
||||
|
||||
# Prevent propagation to root logger to avoid duplicate logs
|
||||
stream_logger.propagate = False
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DataStreamMonitor:
|
||||
@@ -464,7 +486,7 @@ class DataStreamMonitor:
|
||||
summary['imbalance'] = latest_cob['imbalance']
|
||||
summary['spread_bps'] = latest_cob['spread_bps']
|
||||
|
||||
print(f"DATA_STREAM: {json.dumps(summary, separators=(',', ':'))}")
|
||||
stream_logger.info(f"DATA_STREAM: {json.dumps(summary, separators=(',', ':'))}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in compact output: {e}")
|
||||
@@ -472,24 +494,24 @@ class DataStreamMonitor:
|
||||
def _output_detailed_format(self, sample_data: Dict):
|
||||
"""Output data in detailed human-readable format"""
|
||||
try:
|
||||
print(f"\n{'='*80}")
|
||||
print(f"DATA STREAM SAMPLE - {datetime.now().strftime('%H:%M:%S')}")
|
||||
print(f"{'='*80}")
|
||||
stream_logger.info(f"{'='*80}")
|
||||
stream_logger.info(f"DATA STREAM SAMPLE - {datetime.now().strftime('%H:%M:%S')}")
|
||||
stream_logger.info(f"{'='*80}")
|
||||
|
||||
# OHLCV Data
|
||||
if sample_data.get('ohlcv_1m'):
|
||||
latest = sample_data['ohlcv_1m'][-1]
|
||||
print(f"OHLCV (1m): {latest['symbol']} | O:{latest['open']:.2f} H:{latest['high']:.2f} L:{latest['low']:.2f} C:{latest['close']:.2f} V:{latest['volume']:.1f}")
|
||||
stream_logger.info(f"OHLCV (1m): {latest['symbol']} | O:{latest['open']:.2f} H:{latest['high']:.2f} L:{latest['low']:.2f} C:{latest['close']:.2f} V:{latest['volume']:.1f}")
|
||||
|
||||
# Tick Data
|
||||
if sample_data.get('ticks'):
|
||||
latest_tick = sample_data['ticks'][-1]
|
||||
print(f"TICK: {latest_tick['symbol']} | Price:{latest_tick['price']:.2f} Vol:{latest_tick['volume']:.4f} Side:{latest_tick['side']}")
|
||||
stream_logger.info(f"TICK: {latest_tick['symbol']} | Price:{latest_tick['price']:.2f} Vol:{latest_tick['volume']:.4f} Side:{latest_tick['side']}")
|
||||
|
||||
# COB Data
|
||||
if sample_data.get('cob_raw'):
|
||||
latest_cob = sample_data['cob_raw'][-1]
|
||||
print(f"COB: {latest_cob['symbol']} | Imbalance:{latest_cob['imbalance']:.3f} Spread:{latest_cob['spread_bps']:.1f}bps Mid:{latest_cob['mid_price']:.2f}")
|
||||
stream_logger.info(f"COB: {latest_cob['symbol']} | Imbalance:{latest_cob['imbalance']:.3f} Spread:{latest_cob['spread_bps']:.1f}bps Mid:{latest_cob['mid_price']:.2f}")
|
||||
|
||||
# Model States
|
||||
if sample_data.get('model_states'):
|
||||
@@ -498,7 +520,7 @@ class DataStreamMonitor:
|
||||
if 'dqn' in models:
|
||||
dqn_state = models['dqn']
|
||||
state_vec = dqn_state.get('state_vector', [])
|
||||
print(f"DQN State: {len(state_vec)} features | Price:{state_vec[0]*10000:.2f} if state_vec else 'No state'")
|
||||
stream_logger.info(f"DQN State: {len(state_vec)} features | Price:{state_vec[0]*10000:.2f} if state_vec else 'No state'")
|
||||
|
||||
# Predictions
|
||||
if sample_data.get('predictions'):
|
||||
@@ -508,7 +530,7 @@ class DataStreamMonitor:
|
||||
latest_pred = preds[-1]
|
||||
action = latest_pred.get('action', 'N/A')
|
||||
conf = latest_pred.get('confidence', 0)
|
||||
print(f"{model_name.upper()} Prediction: {action} (conf:{conf:.2f})")
|
||||
stream_logger.info(f"{model_name.upper()} Prediction: {action} (conf:{conf:.2f})")
|
||||
|
||||
# Training Experiences
|
||||
if sample_data.get('training_experiences'):
|
||||
@@ -516,9 +538,9 @@ class DataStreamMonitor:
|
||||
reward = latest_exp.get('reward', 0)
|
||||
action = latest_exp.get('action', 'N/A')
|
||||
done = latest_exp.get('done', False)
|
||||
print(f"Training Exp: Action:{action} Reward:{reward:.4f} Done:{done}")
|
||||
stream_logger.info(f"Training Exp: Action:{action} Reward:{reward:.4f} Done:{done}")
|
||||
|
||||
print(f"{'='*80}")
|
||||
stream_logger.info(f"{'='*80}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in detailed output: {e}")
|
||||
|
Reference in New Issue
Block a user