text exporter
This commit is contained in:
@@ -446,8 +446,8 @@ class EnhancedCOBWebSocket:
|
||||
# Add ping/pong handling and proper connection management
|
||||
async with websockets_connect(
|
||||
ws_url,
|
||||
ping_interval=20, # Binance sends ping every 20 seconds
|
||||
ping_timeout=60, # Binance disconnects after 1 minute without pong
|
||||
ping_interval=25, # Slightly longer than default
|
||||
ping_timeout=90, # Allow longer time before timeout
|
||||
close_timeout=10
|
||||
) as websocket:
|
||||
# Connection successful
|
||||
@@ -539,8 +539,11 @@ class EnhancedCOBWebSocket:
|
||||
|
||||
# Wait before reconnecting
|
||||
status.increase_reconnect_delay()
|
||||
logger.info(f"Waiting {status.reconnect_delay:.1f}s before reconnecting {symbol}")
|
||||
await asyncio.sleep(status.reconnect_delay)
|
||||
# Add jitter to avoid synchronized reconnects
|
||||
jitter = 0.5 + (random.random() * 1.5)
|
||||
delay = status.reconnect_delay * jitter
|
||||
logger.info(f"Waiting {delay:.1f}s before reconnecting {symbol}")
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
async def _process_websocket_message(self, symbol: str, data: Dict):
|
||||
"""Process WebSocket message and convert to COB format
|
||||
|
@@ -28,6 +28,9 @@ import shutil
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
|
||||
# Text export integration
|
||||
from .text_export_integration import TextExportManager
|
||||
import pandas as pd
|
||||
from pathlib import Path
|
||||
|
||||
@@ -568,6 +571,7 @@ class TradingOrchestrator:
|
||||
self._initialize_decision_fusion() # Initialize fusion system
|
||||
self._initialize_transformer_model() # Initialize transformer model
|
||||
self._initialize_enhanced_training_system() # Initialize real-time training
|
||||
self._initialize_text_export_manager() # Initialize text data export
|
||||
|
||||
def _normalize_model_name(self, name: str) -> str:
|
||||
"""Map various registry/UI names to canonical toggle keys."""
|
||||
@@ -7023,6 +7027,66 @@ class TradingOrchestrator:
|
||||
logger.error(f"Error stopping enhanced training: {e}")
|
||||
return False
|
||||
|
||||
def _initialize_text_export_manager(self):
|
||||
"""Initialize the text data export manager"""
|
||||
try:
|
||||
self.text_export_manager = TextExportManager(
|
||||
data_provider=self.data_provider,
|
||||
orchestrator=self
|
||||
)
|
||||
|
||||
# Configure with current symbols
|
||||
export_config = {
|
||||
'main_symbol': self.symbol,
|
||||
'ref1_symbol': self.ref_symbols[0] if self.ref_symbols else 'BTC/USDT',
|
||||
'ref2_symbol': 'SPX', # Default to SPX for now
|
||||
'export_dir': 'data/text_exports'
|
||||
}
|
||||
|
||||
self.text_export_manager.export_config.update(export_config)
|
||||
logger.info("Text export manager initialized")
|
||||
logger.info(f" - Main symbol: {export_config['main_symbol']}")
|
||||
logger.info(f" - Reference symbols: {export_config['ref1_symbol']}, {export_config['ref2_symbol']}")
|
||||
logger.info(f" - Export directory: {export_config['export_dir']}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing text export manager: {e}")
|
||||
self.text_export_manager = None
|
||||
|
||||
def start_text_export(self) -> bool:
|
||||
"""Start text data export"""
|
||||
try:
|
||||
if not hasattr(self, 'text_export_manager') or not self.text_export_manager:
|
||||
logger.warning("Text export manager not initialized")
|
||||
return False
|
||||
|
||||
return self.text_export_manager.start_export()
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting text export: {e}")
|
||||
return False
|
||||
|
||||
def stop_text_export(self) -> bool:
|
||||
"""Stop text data export"""
|
||||
try:
|
||||
if not hasattr(self, 'text_export_manager') or not self.text_export_manager:
|
||||
return True
|
||||
|
||||
return self.text_export_manager.stop_export()
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping text export: {e}")
|
||||
return False
|
||||
|
||||
def get_text_export_status(self) -> Dict[str, Any]:
|
||||
"""Get text export status"""
|
||||
try:
|
||||
if not hasattr(self, 'text_export_manager') or not self.text_export_manager:
|
||||
return {'enabled': False, 'initialized': False, 'error': 'Not initialized'}
|
||||
|
||||
return self.text_export_manager.get_export_status()
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting text export status: {e}")
|
||||
return {'enabled': False, 'initialized': False, 'error': str(e)}
|
||||
|
||||
def get_enhanced_training_stats(self) -> Dict[str, Any]:
|
||||
"""Get enhanced training system statistics with orchestrator integration"""
|
||||
try:
|
||||
|
321
core/text_data_exporter.py
Normal file
321
core/text_data_exporter.py
Normal file
@@ -0,0 +1,321 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Text Data Exporter - CSV Interface for External Systems
|
||||
Exports market data in CSV format for integration with text-based systems
|
||||
"""
|
||||
|
||||
import os
|
||||
import csv
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Any
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@dataclass
|
||||
class MarketDataPoint:
|
||||
"""Single market data point"""
|
||||
symbol: str
|
||||
timeframe: str
|
||||
open: float
|
||||
high: float
|
||||
low: float
|
||||
close: float
|
||||
volume: float
|
||||
timestamp: datetime
|
||||
|
||||
class TextDataExporter:
|
||||
"""
|
||||
Exports market data to CSV files for external text-based systems
|
||||
|
||||
Features:
|
||||
- Multi-symbol support (MAIN + REF1 + REF2)
|
||||
- Multi-timeframe (1s, 1m, 1h, 1d)
|
||||
- File rotation every minute
|
||||
- Overwrites within the same minute
|
||||
- Thread-safe operations
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
data_provider=None,
|
||||
export_dir: str = "data/text_exports",
|
||||
main_symbol: str = "ETH/USDT",
|
||||
ref1_symbol: str = "BTC/USDT",
|
||||
ref2_symbol: str = "SPX"):
|
||||
"""
|
||||
Initialize text data exporter
|
||||
|
||||
Args:
|
||||
data_provider: Data provider instance
|
||||
export_dir: Directory for CSV exports
|
||||
main_symbol: Main trading symbol (ETH)
|
||||
ref1_symbol: Reference symbol 1 (BTC)
|
||||
ref2_symbol: Reference symbol 2 (SPX)
|
||||
"""
|
||||
self.data_provider = data_provider
|
||||
self.export_dir = export_dir
|
||||
self.main_symbol = main_symbol
|
||||
self.ref1_symbol = ref1_symbol
|
||||
self.ref2_symbol = ref2_symbol
|
||||
|
||||
# Timeframes to export
|
||||
self.timeframes = ['1s', '1m', '1h', '1d']
|
||||
|
||||
# File management
|
||||
self.current_minute = None
|
||||
self.current_filename = None
|
||||
self.export_lock = threading.Lock()
|
||||
|
||||
# Running state
|
||||
self.is_running = False
|
||||
self.export_thread = None
|
||||
|
||||
# Create export directory
|
||||
os.makedirs(self.export_dir, exist_ok=True)
|
||||
|
||||
logger.info(f"Text Data Exporter initialized - Export dir: {self.export_dir}")
|
||||
logger.info(f"Symbols: MAIN={main_symbol}, REF1={ref1_symbol}, REF2={ref2_symbol}")
|
||||
|
||||
def start(self):
|
||||
"""Start the data export process"""
|
||||
if self.is_running:
|
||||
logger.warning("Text data exporter already running")
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
self.export_thread = threading.Thread(target=self._export_loop, daemon=True)
|
||||
self.export_thread.start()
|
||||
logger.info("Text data exporter started")
|
||||
|
||||
def stop(self):
|
||||
"""Stop the data export process"""
|
||||
self.is_running = False
|
||||
if self.export_thread:
|
||||
self.export_thread.join(timeout=5)
|
||||
logger.info("Text data exporter stopped")
|
||||
|
||||
def _export_loop(self):
|
||||
"""Main export loop - runs every second"""
|
||||
while self.is_running:
|
||||
try:
|
||||
self._export_current_data()
|
||||
time.sleep(1) # Export every second
|
||||
except Exception as e:
|
||||
logger.error(f"Error in export loop: {e}")
|
||||
time.sleep(1)
|
||||
|
||||
def _export_current_data(self):
|
||||
"""Export current market data to CSV"""
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
current_minute_key = current_time.strftime("%Y%m%d_%H%M")
|
||||
|
||||
# Check if we need a new file (new minute)
|
||||
if self.current_minute != current_minute_key:
|
||||
self.current_minute = current_minute_key
|
||||
self.current_filename = f"market_data_{current_minute_key}.csv"
|
||||
logger.info(f"Starting new export file: {self.current_filename}")
|
||||
|
||||
# Gather data for all symbols and timeframes
|
||||
export_data = self._gather_export_data()
|
||||
|
||||
if export_data:
|
||||
self._write_csv_file(export_data)
|
||||
else:
|
||||
logger.debug("No data available for export")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error exporting data: {e}")
|
||||
|
||||
def _gather_export_data(self) -> List[Dict[str, Any]]:
|
||||
"""Gather market data for all symbols and timeframes"""
|
||||
export_rows = []
|
||||
|
||||
if not self.data_provider:
|
||||
return export_rows
|
||||
|
||||
symbols = [
|
||||
("MAIN", self.main_symbol),
|
||||
("REF1", self.ref1_symbol),
|
||||
("REF2", self.ref2_symbol)
|
||||
]
|
||||
|
||||
for symbol_type, symbol in symbols:
|
||||
for timeframe in self.timeframes:
|
||||
try:
|
||||
# Get latest data for this symbol/timeframe
|
||||
data_point = self._get_latest_data(symbol, timeframe)
|
||||
if data_point:
|
||||
export_rows.append({
|
||||
'symbol_type': symbol_type,
|
||||
'symbol': symbol,
|
||||
'timeframe': timeframe,
|
||||
'open': data_point.open,
|
||||
'high': data_point.high,
|
||||
'low': data_point.low,
|
||||
'close': data_point.close,
|
||||
'volume': data_point.volume,
|
||||
'timestamp': data_point.timestamp
|
||||
})
|
||||
except Exception as e:
|
||||
logger.debug(f"Error getting data for {symbol} {timeframe}: {e}")
|
||||
|
||||
return export_rows
|
||||
|
||||
def _get_latest_data(self, symbol: str, timeframe: str) -> Optional[MarketDataPoint]:
|
||||
"""Get latest market data for symbol/timeframe"""
|
||||
try:
|
||||
if not hasattr(self.data_provider, 'get_latest_candle'):
|
||||
return None
|
||||
|
||||
# Try to get latest candle data
|
||||
candle = self.data_provider.get_latest_candle(symbol, timeframe)
|
||||
if not candle:
|
||||
return None
|
||||
|
||||
# Convert to MarketDataPoint
|
||||
return MarketDataPoint(
|
||||
symbol=symbol,
|
||||
timeframe=timeframe,
|
||||
open=float(candle.get('open', 0)),
|
||||
high=float(candle.get('high', 0)),
|
||||
low=float(candle.get('low', 0)),
|
||||
close=float(candle.get('close', 0)),
|
||||
volume=float(candle.get('volume', 0)),
|
||||
timestamp=candle.get('timestamp', datetime.now())
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error getting latest data for {symbol} {timeframe}: {e}")
|
||||
return None
|
||||
|
||||
def _write_csv_file(self, export_data: List[Dict[str, Any]]):
|
||||
"""Write data to CSV file"""
|
||||
if not export_data:
|
||||
return
|
||||
|
||||
filepath = os.path.join(self.export_dir, self.current_filename)
|
||||
|
||||
with self.export_lock:
|
||||
try:
|
||||
with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
|
||||
# Create header based on the format specification
|
||||
fieldnames = self._create_csv_header()
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
|
||||
# Write header
|
||||
writer.writeheader()
|
||||
|
||||
# Group data by symbol type for organized output
|
||||
grouped_data = self._group_data_by_symbol(export_data)
|
||||
|
||||
# Write data rows
|
||||
for row in self._format_csv_rows(grouped_data):
|
||||
writer.writerow(row)
|
||||
|
||||
logger.debug(f"Exported {len(export_data)} data points to {filepath}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing CSV file {filepath}: {e}")
|
||||
|
||||
def _create_csv_header(self) -> List[str]:
|
||||
"""Create CSV header based on specification"""
|
||||
header = ['symbol']
|
||||
|
||||
# Add columns for each symbol type and timeframe
|
||||
for symbol_type in ['MAIN', 'REF1', 'REF2']:
|
||||
for timeframe in self.timeframes:
|
||||
prefix = f"{symbol_type}_{timeframe}"
|
||||
header.extend([
|
||||
f"{prefix}_O", # Open
|
||||
f"{prefix}_H", # High
|
||||
f"{prefix}_L", # Low
|
||||
f"{prefix}_C", # Close
|
||||
f"{prefix}_V", # Volume
|
||||
f"{prefix}_T" # Timestamp
|
||||
])
|
||||
|
||||
return header
|
||||
|
||||
def _group_data_by_symbol(self, export_data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
||||
"""Group data by symbol type and timeframe"""
|
||||
grouped = {}
|
||||
|
||||
for data_point in export_data:
|
||||
symbol_type = data_point['symbol_type']
|
||||
timeframe = data_point['timeframe']
|
||||
|
||||
if symbol_type not in grouped:
|
||||
grouped[symbol_type] = {}
|
||||
|
||||
grouped[symbol_type][timeframe] = data_point
|
||||
|
||||
return grouped
|
||||
|
||||
def _format_csv_rows(self, grouped_data: Dict[str, Dict[str, Dict[str, Any]]]) -> List[Dict[str, Any]]:
|
||||
"""Format data into CSV rows"""
|
||||
rows = []
|
||||
|
||||
# Create a single row with all data
|
||||
row = {'symbol': f"{self.main_symbol.split('/')[0]} ({self.ref1_symbol.split('/')[0]}, {self.ref2_symbol})"}
|
||||
|
||||
for symbol_type in ['MAIN', 'REF1', 'REF2']:
|
||||
symbol_data = grouped_data.get(symbol_type, {})
|
||||
|
||||
for timeframe in self.timeframes:
|
||||
prefix = f"{symbol_type}_{timeframe}"
|
||||
data_point = symbol_data.get(timeframe)
|
||||
|
||||
if data_point:
|
||||
row[f"{prefix}_O"] = f"{data_point['open']:.6f}"
|
||||
row[f"{prefix}_H"] = f"{data_point['high']:.6f}"
|
||||
row[f"{prefix}_L"] = f"{data_point['low']:.6f}"
|
||||
row[f"{prefix}_C"] = f"{data_point['close']:.6f}"
|
||||
row[f"{prefix}_V"] = f"{data_point['volume']:.2f}"
|
||||
row[f"{prefix}_T"] = data_point['timestamp'].strftime("%Y-%m-%d %H:%M:%S")
|
||||
else:
|
||||
# Empty values if no data
|
||||
row[f"{prefix}_O"] = ""
|
||||
row[f"{prefix}_H"] = ""
|
||||
row[f"{prefix}_L"] = ""
|
||||
row[f"{prefix}_C"] = ""
|
||||
row[f"{prefix}_V"] = ""
|
||||
row[f"{prefix}_T"] = ""
|
||||
|
||||
rows.append(row)
|
||||
return rows
|
||||
|
||||
def get_current_filename(self) -> Optional[str]:
|
||||
"""Get current export filename"""
|
||||
return self.current_filename
|
||||
|
||||
def get_export_stats(self) -> Dict[str, Any]:
|
||||
"""Get export statistics"""
|
||||
stats = {
|
||||
'is_running': self.is_running,
|
||||
'export_dir': self.export_dir,
|
||||
'current_filename': self.current_filename,
|
||||
'symbols': {
|
||||
'main': self.main_symbol,
|
||||
'ref1': self.ref1_symbol,
|
||||
'ref2': self.ref2_symbol
|
||||
},
|
||||
'timeframes': self.timeframes
|
||||
}
|
||||
|
||||
# Add file count
|
||||
try:
|
||||
files = [f for f in os.listdir(self.export_dir) if f.endswith('.csv')]
|
||||
stats['total_files'] = len(files)
|
||||
except:
|
||||
stats['total_files'] = 0
|
||||
|
||||
return stats
|
||||
|
||||
# Convenience function for integration
|
||||
def create_text_exporter(data_provider=None, **kwargs) -> TextDataExporter:
|
||||
"""Create and return a TextDataExporter instance"""
|
||||
return TextDataExporter(data_provider=data_provider, **kwargs)
|
233
core/text_export_integration.py
Normal file
233
core/text_export_integration.py
Normal file
@@ -0,0 +1,233 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Text Export Integration - Connects TextDataExporter with existing data systems
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional, Dict, Any
|
||||
from datetime import datetime
|
||||
from .text_data_exporter import TextDataExporter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TextExportManager:
|
||||
"""
|
||||
Manages text data export integration with the trading system
|
||||
"""
|
||||
|
||||
def __init__(self, data_provider=None, orchestrator=None):
|
||||
"""
|
||||
Initialize text export manager
|
||||
|
||||
Args:
|
||||
data_provider: Main data provider instance
|
||||
orchestrator: Trading orchestrator instance
|
||||
"""
|
||||
self.data_provider = data_provider
|
||||
self.orchestrator = orchestrator
|
||||
self.text_exporter: Optional[TextDataExporter] = None
|
||||
|
||||
# Configuration
|
||||
self.export_enabled = False
|
||||
self.export_config = {
|
||||
'main_symbol': 'ETH/USDT',
|
||||
'ref1_symbol': 'BTC/USDT',
|
||||
'ref2_symbol': 'SPX', # Will need to be mapped to available data
|
||||
'export_dir': 'data/text_exports'
|
||||
}
|
||||
|
||||
def initialize_exporter(self, config: Optional[Dict[str, Any]] = None):
|
||||
"""Initialize the text data exporter"""
|
||||
try:
|
||||
if config:
|
||||
self.export_config.update(config)
|
||||
|
||||
# Create enhanced data provider wrapper
|
||||
enhanced_provider = EnhancedDataProviderWrapper(
|
||||
self.data_provider,
|
||||
self.orchestrator
|
||||
)
|
||||
|
||||
# Create text exporter
|
||||
self.text_exporter = TextDataExporter(
|
||||
data_provider=enhanced_provider,
|
||||
export_dir=self.export_config['export_dir'],
|
||||
main_symbol=self.export_config['main_symbol'],
|
||||
ref1_symbol=self.export_config['ref1_symbol'],
|
||||
ref2_symbol=self.export_config['ref2_symbol']
|
||||
)
|
||||
|
||||
logger.info("Text data exporter initialized successfully")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error initializing text exporter: {e}")
|
||||
return False
|
||||
|
||||
def start_export(self):
|
||||
"""Start text data export"""
|
||||
if not self.text_exporter:
|
||||
if not self.initialize_exporter():
|
||||
logger.error("Cannot start export - initialization failed")
|
||||
return False
|
||||
|
||||
try:
|
||||
self.text_exporter.start()
|
||||
self.export_enabled = True
|
||||
logger.info("Text data export started")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting text export: {e}")
|
||||
return False
|
||||
|
||||
def stop_export(self):
|
||||
"""Stop text data export"""
|
||||
if self.text_exporter:
|
||||
try:
|
||||
self.text_exporter.stop()
|
||||
self.export_enabled = False
|
||||
logger.info("Text data export stopped")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error stopping text export: {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def get_export_status(self) -> Dict[str, Any]:
|
||||
"""Get current export status"""
|
||||
status = {
|
||||
'enabled': self.export_enabled,
|
||||
'initialized': self.text_exporter is not None,
|
||||
'config': self.export_config.copy()
|
||||
}
|
||||
|
||||
if self.text_exporter:
|
||||
status.update(self.text_exporter.get_export_stats())
|
||||
|
||||
return status
|
||||
|
||||
def update_config(self, new_config: Dict[str, Any]):
|
||||
"""Update export configuration"""
|
||||
old_enabled = self.export_enabled
|
||||
|
||||
# Stop if running
|
||||
if old_enabled:
|
||||
self.stop_export()
|
||||
|
||||
# Update config
|
||||
self.export_config.update(new_config)
|
||||
|
||||
# Reinitialize
|
||||
self.text_exporter = None
|
||||
|
||||
# Restart if was enabled
|
||||
if old_enabled:
|
||||
self.start_export()
|
||||
|
||||
logger.info(f"Text export config updated: {new_config}")
|
||||
|
||||
class EnhancedDataProviderWrapper:
|
||||
"""
|
||||
Wrapper around the existing data provider to provide the interface
|
||||
expected by TextDataExporter
|
||||
"""
|
||||
|
||||
def __init__(self, data_provider, orchestrator=None):
|
||||
self.data_provider = data_provider
|
||||
self.orchestrator = orchestrator
|
||||
|
||||
# Timeframe mapping
|
||||
self.timeframe_map = {
|
||||
'1s': '1s',
|
||||
'1m': '1m',
|
||||
'1h': '1h',
|
||||
'1d': '1d'
|
||||
}
|
||||
|
||||
def get_latest_candle(self, symbol: str, timeframe: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get latest candle data for symbol/timeframe"""
|
||||
try:
|
||||
# Handle special symbols
|
||||
if symbol == 'SPX':
|
||||
return self._get_spx_data()
|
||||
|
||||
# Map timeframe
|
||||
mapped_timeframe = self.timeframe_map.get(timeframe, timeframe)
|
||||
|
||||
# Try different methods to get data
|
||||
candle_data = None
|
||||
|
||||
# Method 1: Direct candle data
|
||||
if hasattr(self.data_provider, 'get_latest_candle'):
|
||||
candle_data = self.data_provider.get_latest_candle(symbol, mapped_timeframe)
|
||||
|
||||
# Method 2: From candle buffer
|
||||
elif hasattr(self.data_provider, 'candle_buffer'):
|
||||
buffer_key = f"{symbol}_{mapped_timeframe}"
|
||||
if buffer_key in self.data_provider.candle_buffer:
|
||||
candles = self.data_provider.candle_buffer[buffer_key]
|
||||
if candles:
|
||||
latest = candles[-1]
|
||||
candle_data = {
|
||||
'open': latest.get('open', 0),
|
||||
'high': latest.get('high', 0),
|
||||
'low': latest.get('low', 0),
|
||||
'close': latest.get('close', 0),
|
||||
'volume': latest.get('volume', 0),
|
||||
'timestamp': latest.get('timestamp', datetime.now())
|
||||
}
|
||||
|
||||
# Method 3: From tick data (for 1s timeframe)
|
||||
elif mapped_timeframe == '1s' and hasattr(self.data_provider, 'latest_prices'):
|
||||
if symbol in self.data_provider.latest_prices:
|
||||
price = self.data_provider.latest_prices[symbol]
|
||||
candle_data = {
|
||||
'open': price,
|
||||
'high': price,
|
||||
'low': price,
|
||||
'close': price,
|
||||
'volume': 0,
|
||||
'timestamp': datetime.now()
|
||||
}
|
||||
|
||||
return candle_data
|
||||
|
||||
except Exception as e:
|
||||
logger.debug(f"Error getting candle data for {symbol} {timeframe}: {e}")
|
||||
return None
|
||||
|
||||
def _get_spx_data(self) -> Optional[Dict[str, Any]]:
|
||||
"""Get SPX data - placeholder for now"""
|
||||
# For now, return mock SPX data
|
||||
# In production, this would connect to a stock data provider
|
||||
return {
|
||||
'open': 5500.0,
|
||||
'high': 5520.0,
|
||||
'low': 5495.0,
|
||||
'close': 5510.0,
|
||||
'volume': 1000000,
|
||||
'timestamp': datetime.now()
|
||||
}
|
||||
|
||||
# Integration helper functions
|
||||
def setup_text_export(data_provider=None, orchestrator=None, config: Optional[Dict[str, Any]] = None) -> TextExportManager:
|
||||
"""Setup text export with default configuration"""
|
||||
manager = TextExportManager(data_provider, orchestrator)
|
||||
|
||||
if config:
|
||||
manager.export_config.update(config)
|
||||
|
||||
return manager
|
||||
|
||||
def start_text_export_service(data_provider=None, orchestrator=None, auto_start: bool = True) -> TextExportManager:
|
||||
"""Start text export service with auto-initialization"""
|
||||
manager = setup_text_export(data_provider, orchestrator)
|
||||
|
||||
if auto_start:
|
||||
if manager.initialize_exporter():
|
||||
manager.start_export()
|
||||
logger.info("Text export service started successfully")
|
||||
else:
|
||||
logger.error("Failed to start text export service")
|
||||
|
||||
return manager
|
Reference in New Issue
Block a user