Files
gogo2/core/text_data_exporter.py
2025-08-26 18:37:00 +03:00

364 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Text Data Exporter - CSV Interface for External Systems
Exports market data in CSV format for integration with text-based systems
"""
import os
import csv
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class MarketDataPoint:
"""Single market data point"""
symbol: str
timeframe: str
open: float
high: float
low: float
close: float
volume: float
timestamp: datetime
class TextDataExporter:
"""
Exports market data to CSV files for external text-based systems
Features:
- Multi-symbol support (MAIN + REF1 + REF2)
- Multi-timeframe (1s, 1m, 1h, 1d)
- File rotation every minute
- Overwrites within the same minute
- Thread-safe operations
"""
def __init__(self,
data_provider=None,
export_dir: str = "NN/training/samples/txt",
main_symbol: str = "ETH/USDT",
ref1_symbol: str = "BTC/USDT",
ref2_symbol: str = "SPX"):
"""
Initialize text data exporter
Args:
data_provider: Data provider instance
export_dir: Directory for CSV exports
main_symbol: Main trading symbol (ETH)
ref1_symbol: Reference symbol 1 (BTC)
ref2_symbol: Reference symbol 2 (SPX)
"""
self.data_provider = data_provider
self.export_dir = export_dir
self.main_symbol = main_symbol
self.ref1_symbol = ref1_symbol
self.ref2_symbol = ref2_symbol
# Timeframes to export
self.timeframes = ['1s', '1m', '1h', '1d']
# File management
self.current_minute = None
self.current_filename = None
self.export_lock = threading.Lock()
# Running state
self.is_running = False
self.export_thread = None
# Create export directory
os.makedirs(self.export_dir, exist_ok=True)
logger.info(f"Text Data Exporter initialized - Export dir: {self.export_dir}")
logger.info(f"Symbols: MAIN={main_symbol}, REF1={ref1_symbol}, REF2={ref2_symbol}")
def start(self):
"""Start the data export process"""
if self.is_running:
logger.warning("Text data exporter already running")
return
self.is_running = True
self.export_thread = threading.Thread(target=self._export_loop, daemon=True)
self.export_thread.start()
logger.info("Text data exporter started")
def stop(self):
"""Stop the data export process"""
self.is_running = False
if self.export_thread:
self.export_thread.join(timeout=5)
logger.info("Text data exporter stopped")
def _export_loop(self):
"""Main export loop - runs every second"""
while self.is_running:
try:
self._export_current_data()
time.sleep(1) # Export every second
except Exception as e:
logger.error(f"Error in export loop: {e}")
time.sleep(1)
def _export_current_data(self):
"""Export current market data to CSV"""
try:
current_time = datetime.now()
current_minute_key = current_time.strftime("%Y%m%d_%H%M")
# Check if we need a new file (new minute)
if self.current_minute != current_minute_key:
self.current_minute = current_minute_key
self.current_filename = f"market_data_{current_minute_key}.txt"
logger.info(f"Starting new export file: {self.current_filename}")
# Gather data for all symbols and timeframes
export_data = self._gather_export_data()
if export_data:
self._write_csv_file(export_data)
else:
logger.debug("No data available for export")
except Exception as e:
logger.error(f"Error exporting data: {e}")
def _gather_export_data(self) -> List[Dict[str, Any]]:
"""Gather market data for all symbols and timeframes"""
export_rows = []
if not self.data_provider:
return export_rows
symbols = [
("MAIN", self.main_symbol),
("REF1", self.ref1_symbol),
("REF2", self.ref2_symbol)
]
for symbol_type, symbol in symbols:
for timeframe in self.timeframes:
try:
# Get latest data for this symbol/timeframe
data_point = self._get_latest_data(symbol, timeframe)
if data_point:
export_rows.append({
'symbol_type': symbol_type,
'symbol': symbol,
'timeframe': timeframe,
'open': data_point.open,
'high': data_point.high,
'low': data_point.low,
'close': data_point.close,
'volume': data_point.volume,
'timestamp': data_point.timestamp
})
except Exception as e:
logger.debug(f"Error getting data for {symbol} {timeframe}: {e}")
return export_rows
def _get_latest_data(self, symbol: str, timeframe: str) -> Optional[MarketDataPoint]:
"""Get latest market data for symbol/timeframe"""
try:
if not hasattr(self.data_provider, 'get_latest_candle'):
return None
# Try to get latest candle data
candle = self.data_provider.get_latest_candle(symbol, timeframe)
if not candle:
return None
# Convert to MarketDataPoint
return MarketDataPoint(
symbol=symbol,
timeframe=timeframe,
open=float(candle.get('open', 0)),
high=float(candle.get('high', 0)),
low=float(candle.get('low', 0)),
close=float(candle.get('close', 0)),
volume=float(candle.get('volume', 0)),
timestamp=candle.get('timestamp', datetime.now())
)
except Exception as e:
logger.debug(f"Error getting latest data for {symbol} {timeframe}: {e}")
return None
def _write_csv_file(self, export_data: List[Dict[str, Any]]):
"""Write data to TXT file in tab-separated format"""
if not export_data:
return
filepath = os.path.join(self.export_dir, self.current_filename)
with self.export_lock:
try:
# Group data by symbol type for organized output
grouped_data = self._group_data_by_symbol(export_data)
with open(filepath, 'w', encoding='utf-8') as txtfile:
# Write in the format specified in readme.md sample
self._write_tab_format(txtfile, grouped_data)
logger.debug(f"Exported {len(export_data)} data points to {filepath}")
except Exception as e:
logger.error(f"Error writing TXT file {filepath}: {e}")
def _create_csv_header(self) -> List[str]:
"""Create CSV header based on specification"""
header = ['symbol']
# Add columns for each symbol type and timeframe
for symbol_type in ['MAIN', 'REF1', 'REF2']:
for timeframe in self.timeframes:
prefix = f"{symbol_type}_{timeframe}"
header.extend([
f"{prefix}_O", # Open
f"{prefix}_H", # High
f"{prefix}_L", # Low
f"{prefix}_C", # Close
f"{prefix}_V", # Volume
f"{prefix}_T" # Timestamp
])
return header
def _group_data_by_symbol(self, export_data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Group data by symbol type and timeframe"""
grouped = {}
for data_point in export_data:
symbol_type = data_point['symbol_type']
timeframe = data_point['timeframe']
if symbol_type not in grouped:
grouped[symbol_type] = {}
grouped[symbol_type][timeframe] = data_point
return grouped
def _format_csv_rows(self, grouped_data: Dict[str, Dict[str, Dict[str, Any]]]) -> List[Dict[str, Any]]:
"""Format data into CSV rows"""
rows = []
# Create a single row with all data
row = {'symbol': f"{self.main_symbol.split('/')[0]} ({self.ref1_symbol.split('/')[0]}, {self.ref2_symbol})"}
for symbol_type in ['MAIN', 'REF1', 'REF2']:
symbol_data = grouped_data.get(symbol_type, {})
for timeframe in self.timeframes:
prefix = f"{symbol_type}_{timeframe}"
data_point = symbol_data.get(timeframe)
if data_point:
row[f"{prefix}_O"] = f"{data_point['open']:.6f}"
row[f"{prefix}_H"] = f"{data_point['high']:.6f}"
row[f"{prefix}_L"] = f"{data_point['low']:.6f}"
row[f"{prefix}_C"] = f"{data_point['close']:.6f}"
row[f"{prefix}_V"] = f"{data_point['volume']:.2f}"
row[f"{prefix}_T"] = data_point['timestamp'].strftime("%Y-%m-%d %H:%M:%S")
else:
# Empty values if no data
row[f"{prefix}_O"] = ""
row[f"{prefix}_H"] = ""
row[f"{prefix}_L"] = ""
row[f"{prefix}_C"] = ""
row[f"{prefix}_V"] = ""
row[f"{prefix}_T"] = ""
rows.append(row)
return rows
def _write_tab_format(self, txtfile, grouped_data: Dict[str, Dict[str, Dict[str, Any]]]):
"""Write data in tab-separated format like readme.md sample"""
# Write header structure
txtfile.write("symbol\tMAIN SYMBOL (ETH)\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tREF1 (BTC)\t\t\t\t\t\tREF2 (SPX)\t\t\t\t\t\tREF3 (SOL)\n")
txtfile.write("timeframe\t1s\t\t\t\t\t\t1m\t\t\t\t\t\t1h\t\t\t\t\t\t1d\t\t\t\t\t\t1s\t\t\t\t\t\t1s\t\t\t\t\t\t1s\n")
txtfile.write("datapoint\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\n")
# Write data row
row_parts = []
current_time = datetime.now()
# Timestamp first
row_parts.append(current_time.strftime("%Y-%m-%dT%H:%M:%SZ"))
# ETH data for all timeframes (1s, 1m, 1h, 1d)
main_data = grouped_data.get('MAIN', {})
for timeframe in ['1s', '1m', '1h', '1d']:
data_point = main_data.get(timeframe)
if data_point:
row_parts.extend([
f"{data_point['open']:.2f}",
f"{data_point['high']:.2f}",
f"{data_point['low']:.2f}",
f"{data_point['close']:.2f}",
f"{data_point['volume']:.1f}",
data_point['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
])
else:
row_parts.extend(["0", "0", "0", "0", "0", current_time.strftime("%Y-%m-%dT%H:%M:%SZ")])
# REF1 (BTC), REF2 (SPX), REF3 (SOL) - 1s timeframe only
for ref_type in ['REF1', 'REF2']: # REF3 will be added by LLM proxy
ref_data = grouped_data.get(ref_type, {})
data_point = ref_data.get('1s')
if data_point:
row_parts.extend([
f"{data_point['open']:.2f}",
f"{data_point['high']:.2f}",
f"{data_point['low']:.2f}",
f"{data_point['close']:.2f}",
f"{data_point['volume']:.1f}",
data_point['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
])
else:
row_parts.extend(["0", "0", "0", "0", "0", current_time.strftime("%Y-%m-%dT%H:%M:%SZ")])
# Add placeholder for REF3 (SOL) - will be filled by LLM proxy
row_parts.extend(["0", "0", "0", "0", "0", current_time.strftime("%Y-%m-%dT%H:%M:%SZ")])
txtfile.write("\t".join(row_parts) + "\n")
def get_current_filename(self) -> Optional[str]:
"""Get current export filename"""
return self.current_filename
def get_export_stats(self) -> Dict[str, Any]:
"""Get export statistics"""
stats = {
'is_running': self.is_running,
'export_dir': self.export_dir,
'current_filename': self.current_filename,
'symbols': {
'main': self.main_symbol,
'ref1': self.ref1_symbol,
'ref2': self.ref2_symbol
},
'timeframes': self.timeframes
}
# Add file count
try:
files = [f for f in os.listdir(self.export_dir) if f.endswith('.txt')]
stats['total_files'] = len(files)
except:
stats['total_files'] = 0
return stats
# Convenience function for integration
def create_text_exporter(data_provider=None, **kwargs) -> TextDataExporter:
"""Create and return a TextDataExporter instance"""
return TextDataExporter(data_provider=data_provider, **kwargs)