#!/usr/bin/env python3 """ Text Export Integration - Connects TextDataExporter with existing data systems """ import logging from typing import Optional, Dict, Any from datetime import datetime from .text_data_exporter import TextDataExporter logger = logging.getLogger(__name__) class TextExportManager: """ Manages text data export integration with the trading system """ def __init__(self, data_provider=None, orchestrator=None): """ Initialize text export manager Args: data_provider: Main data provider instance orchestrator: Trading orchestrator instance """ self.data_provider = data_provider self.orchestrator = orchestrator self.text_exporter: Optional[TextDataExporter] = None # Configuration self.export_enabled = False self.export_config = { 'main_symbol': 'ETH/USDT', 'ref1_symbol': 'BTC/USDT', 'ref2_symbol': 'SPX', # Will need to be mapped to available data 'ref3_symbol': 'SOL/USDT', 'export_dir': 'NN/training/samples/txt', 'export_format': 'PIPE' # PIPE (default) or TAB } def initialize_exporter(self, config: Optional[Dict[str, Any]] = None): """Initialize the text data exporter""" try: if config: self.export_config.update(config) # Create enhanced data provider wrapper enhanced_provider = EnhancedDataProviderWrapper( self.data_provider, self.orchestrator ) # Create text exporter self.text_exporter = TextDataExporter( data_provider=enhanced_provider, export_dir=self.export_config['export_dir'], main_symbol=self.export_config['main_symbol'], ref1_symbol=self.export_config['ref1_symbol'], ref2_symbol=self.export_config['ref2_symbol'], ref3_symbol=self.export_config.get('ref3_symbol', 'SOL/USDT'), export_format=self.export_config.get('export_format', 'PIPE') ) logger.info("Text data exporter initialized successfully") return True except Exception as e: logger.error(f"Error initializing text exporter: {e}") return False def start_export(self): """Start text data export""" if not self.text_exporter: if not self.initialize_exporter(): logger.error("Cannot start export - initialization failed") return False try: self.text_exporter.start() self.export_enabled = True logger.info("Text data export started") return True except Exception as e: logger.error(f"Error starting text export: {e}") return False def stop_export(self): """Stop text data export""" if self.text_exporter: try: self.text_exporter.stop() self.export_enabled = False logger.info("Text data export stopped") return True except Exception as e: logger.error(f"Error stopping text export: {e}") return False return True def get_export_status(self) -> Dict[str, Any]: """Get current export status""" status = { 'enabled': self.export_enabled, 'initialized': self.text_exporter is not None, 'config': self.export_config.copy() } if self.text_exporter: status.update(self.text_exporter.get_export_stats()) return status def update_config(self, new_config: Dict[str, Any]): """Update export configuration""" old_enabled = self.export_enabled # Stop if running if old_enabled: self.stop_export() # Update config self.export_config.update(new_config) # Reinitialize self.text_exporter = None # Restart if was enabled if old_enabled: self.start_export() logger.info(f"Text export config updated: {new_config}") class EnhancedDataProviderWrapper: """ Wrapper around the existing data provider to provide the interface expected by TextDataExporter """ def __init__(self, data_provider, orchestrator=None): self.data_provider = data_provider self.orchestrator = orchestrator # Timeframe mapping self.timeframe_map = { '1s': '1s', '1m': '1m', '1h': '1h', '1d': '1d' } def get_latest_candle(self, symbol: str, timeframe: str) -> Optional[Dict[str, Any]]: """Get latest candle data for symbol/timeframe""" try: # Handle special symbols if symbol == 'SPX': return self._get_spx_data() # Map timeframe mapped_timeframe = self.timeframe_map.get(timeframe, timeframe) # Try different methods to get data candle_data = None # Method 1: Direct candle data if hasattr(self.data_provider, 'get_latest_candle'): candle_data = self.data_provider.get_latest_candle(symbol, mapped_timeframe) # Method 2: From candle buffer elif hasattr(self.data_provider, 'candle_buffer'): buffer_key = f"{symbol}_{mapped_timeframe}" if buffer_key in self.data_provider.candle_buffer: candles = self.data_provider.candle_buffer[buffer_key] if candles: latest = candles[-1] candle_data = { 'open': latest.get('open', 0), 'high': latest.get('high', 0), 'low': latest.get('low', 0), 'close': latest.get('close', 0), 'volume': latest.get('volume', 0), 'timestamp': latest.get('timestamp', datetime.now()) } # Method 3: From tick data (for 1s timeframe) elif mapped_timeframe == '1s' and hasattr(self.data_provider, 'latest_prices'): if symbol in self.data_provider.latest_prices: price = self.data_provider.latest_prices[symbol] candle_data = { 'open': price, 'high': price, 'low': price, 'close': price, 'volume': 0, 'timestamp': datetime.now() } return candle_data except Exception as e: logger.debug(f"Error getting candle data for {symbol} {timeframe}: {e}") return None def _get_spx_data(self) -> Optional[Dict[str, Any]]: """Get SPX data - placeholder for now""" # No synthetic data allowed; return None if not available return None # Integration helper functions def setup_text_export(data_provider=None, orchestrator=None, config: Optional[Dict[str, Any]] = None) -> TextExportManager: """Setup text export with default configuration""" manager = TextExportManager(data_provider, orchestrator) if config: manager.export_config.update(config) return manager def start_text_export_service(data_provider=None, orchestrator=None, auto_start: bool = True) -> TextExportManager: """Start text export service with auto-initialization""" manager = setup_text_export(data_provider, orchestrator) if auto_start: if manager.initialize_exporter(): manager.start_export() logger.info("Text export service started successfully") else: logger.error("Failed to start text export service") return manager