#!/usr/bin/env python3 """ Text Data Exporter - CSV Interface for External Systems Exports market data in CSV format for integration with text-based systems """ import os import csv import threading import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class MarketDataPoint: """Single market data point""" symbol: str timeframe: str open: float high: float low: float close: float volume: float timestamp: datetime class TextDataExporter: """ Exports market data to CSV files for external text-based systems Features: - Multi-symbol support (MAIN + REF1 + REF2) - Multi-timeframe (1s, 1m, 1h, 1d) - File rotation every minute - Overwrites within the same minute - Thread-safe operations """ def __init__(self, data_provider=None, export_dir: str = "NN/training/samples/txt", main_symbol: str = "ETH/USDT", ref1_symbol: str = "BTC/USDT", ref2_symbol: str = "SPX", ref3_symbol: str = "SOL/USDT", export_format: str = "PIPE"): """ Initialize text data exporter Args: data_provider: Data provider instance export_dir: Directory for CSV exports main_symbol: Main trading symbol (ETH) ref1_symbol: Reference symbol 1 (BTC) ref2_symbol: Reference symbol 2 (SPX) """ self.data_provider = data_provider self.export_dir = export_dir self.main_symbol = main_symbol self.ref1_symbol = ref1_symbol self.ref2_symbol = ref2_symbol self.ref3_symbol = ref3_symbol self.export_format = export_format.upper() if isinstance(export_format, str) else "PIPE" # Timeframes to export self.timeframes = ['1s', '1m', '1h', '1d'] # File management self.current_minute = None self.current_filename = None self.export_lock = threading.Lock() # Running state self.is_running = False self.export_thread = None # Create export directory os.makedirs(self.export_dir, exist_ok=True) logger.info(f"Text Data Exporter initialized - Export dir: {self.export_dir}") logger.info(f"Symbols: MAIN={main_symbol}, REF1={ref1_symbol}, REF2={ref2_symbol}, REF3={ref3_symbol}") def start(self): """Start the data export process""" if self.is_running: logger.warning("Text data exporter already running") return self.is_running = True self.export_thread = threading.Thread(target=self._export_loop, daemon=True) self.export_thread.start() logger.info("Text data exporter started") def stop(self): """Stop the data export process""" self.is_running = False if self.export_thread: self.export_thread.join(timeout=5) logger.info("Text data exporter stopped") def _export_loop(self): """Main export loop - runs every second""" while self.is_running: try: self._export_current_data() time.sleep(1) # Export every second except Exception as e: logger.error(f"Error in export loop: {e}") time.sleep(1) def _export_current_data(self): """Export current market data to CSV""" try: current_time = datetime.now() current_minute_key = current_time.strftime("%Y%m%d_%H%M") # Check if we need a new file (new minute) if self.current_minute != current_minute_key: self.current_minute = current_minute_key self.current_filename = f"market_data_{current_minute_key}.txt" logger.info(f"Starting new export file: {self.current_filename}") # Gather data for all symbols and timeframes export_data = self._gather_export_data() if export_data: self._write_csv_file(export_data) else: logger.debug("No data available for export") except Exception as e: logger.error(f"Error exporting data: {e}") def _gather_export_data(self) -> List[Dict[str, Any]]: """Gather market data for all symbols and timeframes""" export_rows = [] if not self.data_provider: return export_rows symbols = [ ("MAIN", self.main_symbol), ("REF1", self.ref1_symbol), ("REF2", self.ref2_symbol), ("REF3", self.ref3_symbol) ] for symbol_type, symbol in symbols: for timeframe in self.timeframes: try: # Get latest data for this symbol/timeframe data_point = self._get_latest_data(symbol, timeframe) if data_point: export_rows.append({ 'symbol_type': symbol_type, 'symbol': symbol, 'timeframe': timeframe, 'open': data_point.open, 'high': data_point.high, 'low': data_point.low, 'close': data_point.close, 'volume': data_point.volume, 'timestamp': data_point.timestamp }) except Exception as e: logger.debug(f"Error getting data for {symbol} {timeframe}: {e}") return export_rows def _get_latest_data(self, symbol: str, timeframe: str) -> Optional[MarketDataPoint]: """Get latest market data for symbol/timeframe""" try: candle = None # Try direct method if hasattr(self.data_provider, 'get_latest_candle'): candle = self.data_provider.get_latest_candle(symbol, timeframe) # Fallback to historical last row if (not candle) and hasattr(self.data_provider, 'get_historical_data'): try: df = self.data_provider.get_historical_data(symbol, timeframe, limit=1) if df is not None and not df.empty: latest = df.iloc[-1] ts = df.index[-1] candle = { 'open': latest.get('open', 0), 'high': latest.get('high', 0), 'low': latest.get('low', 0), 'close': latest.get('close', 0), 'volume': latest.get('volume', 0), 'timestamp': ts.to_pydatetime() if hasattr(ts, 'to_pydatetime') else ts } except Exception as _ex: logger.debug(f"hist fallback failed for {symbol} {timeframe}: {_ex}") if not candle: return None # Convert to MarketDataPoint return MarketDataPoint( symbol=symbol, timeframe=timeframe, open=float(candle.get('open', 0)), high=float(candle.get('high', 0)), low=float(candle.get('low', 0)), close=float(candle.get('close', 0)), volume=float(candle.get('volume', 0)), timestamp=candle.get('timestamp', datetime.now()) ) except Exception as e: logger.debug(f"Error getting latest data for {symbol} {timeframe}: {e}") return None def _write_csv_file(self, export_data: List[Dict[str, Any]]): """Write data to TXT file in tab-separated format""" if not export_data: return filepath = os.path.join(self.export_dir, self.current_filename) with self.export_lock: try: # Group data by symbol type for organized output grouped_data = self._group_data_by_symbol(export_data) with open(filepath, 'w', encoding='utf-8') as txtfile: if self.export_format == 'TAB': self._write_tab_format(txtfile, grouped_data) else: self._write_pipe_format(txtfile, grouped_data) logger.debug(f"Exported {len(export_data)} data points to {filepath}") except Exception as e: logger.error(f"Error writing TXT file {filepath}: {e}") def _create_csv_header(self) -> List[str]: """Create CSV header based on specification""" header = ['symbol'] # Add columns for each symbol type and timeframe for symbol_type in ['MAIN', 'REF1', 'REF2']: for timeframe in self.timeframes: prefix = f"{symbol_type}_{timeframe}" header.extend([ f"{prefix}_O", # Open f"{prefix}_H", # High f"{prefix}_L", # Low f"{prefix}_C", # Close f"{prefix}_V", # Volume f"{prefix}_T" # Timestamp ]) return header def _group_data_by_symbol(self, export_data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Dict[str, Any]]]: """Group data by symbol type and timeframe""" grouped = {} for data_point in export_data: symbol_type = data_point['symbol_type'] timeframe = data_point['timeframe'] if symbol_type not in grouped: grouped[symbol_type] = {} grouped[symbol_type][timeframe] = data_point return grouped def _format_csv_rows(self, grouped_data: Dict[str, Dict[str, Dict[str, Any]]]) -> List[Dict[str, Any]]: """Format data into CSV rows""" rows = [] # Create a single row with all data row = {'symbol': f"{self.main_symbol.split('/')[0]} ({self.ref1_symbol.split('/')[0]}, {self.ref2_symbol})"} for symbol_type in ['MAIN', 'REF1', 'REF2']: symbol_data = grouped_data.get(symbol_type, {}) for timeframe in self.timeframes: prefix = f"{symbol_type}_{timeframe}" data_point = symbol_data.get(timeframe) if data_point: row[f"{prefix}_O"] = f"{data_point['open']:.6f}" row[f"{prefix}_H"] = f"{data_point['high']:.6f}" row[f"{prefix}_L"] = f"{data_point['low']:.6f}" row[f"{prefix}_C"] = f"{data_point['close']:.6f}" row[f"{prefix}_V"] = f"{data_point['volume']:.2f}" row[f"{prefix}_T"] = data_point['timestamp'].strftime("%Y-%m-%d %H:%M:%S") else: # Empty values if no data row[f"{prefix}_O"] = "" row[f"{prefix}_H"] = "" row[f"{prefix}_L"] = "" row[f"{prefix}_C"] = "" row[f"{prefix}_V"] = "" row[f"{prefix}_T"] = "" rows.append(row) return rows def _write_tab_format(self, txtfile, grouped_data: Dict[str, Dict[str, Dict[str, Any]]] ): """Write data in tab-separated format like readme.md sample""" # Write header structure txtfile.write("symbol\tMAIN SYMBOL (ETH)\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tREF1 (BTC)\t\t\t\t\t\t\tREF2 (SPX)\t\t\t\t\t\t\tREF3 (SOL)\n") txtfile.write("timeframe\t1s\t\t\t\t\t\t1m\t\t\t\t\t\t1h\t\t\t\t\t\t1d\t\t\t\t\t\t1s\t\t\t\t\t\t1s\t\t\t\t\t\t1s\n") txtfile.write("datapoint\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\n") # Build up to 300 rows using historical 1s data when available # Collect timestamps from MAIN 1s historical data timestamps: List[Any] = [] hist_df = None try: if hasattr(self.data_provider, 'get_historical_data'): hist_df = self.data_provider.get_historical_data(self.main_symbol, '1s', limit=300) if hist_df is not None and not hist_df.empty: timestamps = list(hist_df.index[-300:]) except Exception as _e: logger.debug(f"hist 1s not available: {_e}") if not timestamps: timestamps = [datetime.utcnow()] # Fetch snapshots for non-1s main_data = grouped_data.get('MAIN', {}) main_1m = main_data.get('1m') main_1h = main_data.get('1h') main_1d = main_data.get('1d') ref1_1s = grouped_data.get('REF1', {}).get('1s') ref2_1s = grouped_data.get('REF2', {}).get('1s') ref3_1s = grouped_data.get('REF3', {}).get('1s') for ts in timestamps: try: ts_str = ts.strftime("%Y-%m-%dT%H:%M:%SZ") if hasattr(ts, 'strftime') else str(ts) except Exception: ts_str = "" row_parts = [ts_str] # MAIN 1s from hist row if possible if hist_df is not None and ts in hist_df.index: r = hist_df.loc[ts] row_parts.extend([ f"{float(r.get('open', 0) or 0):.2f}", f"{float(r.get('high', 0) or 0):.2f}", f"{float(r.get('low', 0) or 0):.2f}", f"{float(r.get('close', 0) or 0):.2f}", f"{float(r.get('volume', 0) or 0):.1f}", ts_str ]) else: snap = main_data.get('1s') if snap: row_parts.extend([ f"{snap['open']:.2f}", f"{snap['high']:.2f}", f"{snap['low']:.2f}", f"{snap['close']:.2f}", f"{snap['volume']:.1f}", ts_str ]) else: row_parts.extend(["0", "0", "0", "0", "0", ts_str]) # MAIN 1m/1h/1d snapshots for snap in [main_1m, main_1h, main_1d]: if snap: row_parts.extend([ f"{snap['open']:.2f}", f"{snap['high']:.2f}", f"{snap['low']:.2f}", f"{snap['close']:.2f}", f"{snap['volume']:.1f}", snap['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") ]) else: row_parts.extend(["0", "0", "0", "0", "0", ts_str]) # REF1/REF2/REF3 1s snapshots for snap in [ref1_1s, ref2_1s, ref3_1s]: if snap: row_parts.extend([ f"{snap['open']:.2f}", f"{snap['high']:.2f}", f"{snap['low']:.2f}", f"{snap['close']:.2f}", f"{snap['volume']:.1f}", snap['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ") ]) else: row_parts.extend(["0", "0", "0", "0", "0", ts_str]) txtfile.write("\t".join(row_parts) + "\n") def _write_pipe_format(self, txtfile, grouped_data: Dict[str, Dict[str, Dict[str, Any]]] ): """Write data in pipe-delimited console-friendly grid.""" from io import StringIO buffer = StringIO() self._write_tab_format(buffer, grouped_data) content = buffer.getvalue().splitlines() if not content: return headers = [line.split('\t') for line in content[:3]] data_rows = [line.split('\t') for line in content[3:]] def to_pipe(row: List[str]) -> str: return "|" + "|".join(str(col) for col in row) + "|" # Render header rows with separators for hdr in headers: txtfile.write(to_pipe(hdr) + "\n") txtfile.write("|" + "|".join(["-" * max(3, len(str(c))) for c in hdr]) + "|\n") # Render data for row in data_rows: txtfile.write(to_pipe(row) + "\n") def get_current_filename(self) -> Optional[str]: """Get current export filename""" return self.current_filename def get_export_stats(self) -> Dict[str, Any]: """Get export statistics""" stats = { 'is_running': self.is_running, 'export_dir': self.export_dir, 'current_filename': self.current_filename, 'symbols': { 'main': self.main_symbol, 'ref1': self.ref1_symbol, 'ref2': self.ref2_symbol }, 'timeframes': self.timeframes } # Add file count try: files = [f for f in os.listdir(self.export_dir) if f.endswith('.txt')] stats['total_files'] = len(files) except: stats['total_files'] = 0 return stats # Convenience function for integration def create_text_exporter(data_provider=None, **kwargs) -> TextDataExporter: """Create and return a TextDataExporter instance""" return TextDataExporter(data_provider=data_provider, **kwargs)