436 lines
17 KiB
Python
436 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Text Data Exporter - CSV Interface for External Systems
|
|
Exports market data in CSV format for integration with text-based systems
|
|
"""
|
|
|
|
import os
|
|
import csv
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class MarketDataPoint:
|
|
"""Single market data point"""
|
|
symbol: str
|
|
timeframe: str
|
|
open: float
|
|
high: float
|
|
low: float
|
|
close: float
|
|
volume: float
|
|
timestamp: datetime
|
|
|
|
class TextDataExporter:
|
|
"""
|
|
Exports market data to CSV files for external text-based systems
|
|
|
|
Features:
|
|
- Multi-symbol support (MAIN + REF1 + REF2)
|
|
- Multi-timeframe (1s, 1m, 1h, 1d)
|
|
- File rotation every minute
|
|
- Overwrites within the same minute
|
|
- Thread-safe operations
|
|
"""
|
|
|
|
def __init__(self,
|
|
data_provider=None,
|
|
export_dir: str = "NN/training/samples/txt",
|
|
main_symbol: str = "ETH/USDT",
|
|
ref1_symbol: str = "BTC/USDT",
|
|
ref2_symbol: str = "SPX",
|
|
ref3_symbol: str = "SOL/USDT",
|
|
export_format: str = "PIPE"):
|
|
"""
|
|
Initialize text data exporter
|
|
|
|
Args:
|
|
data_provider: Data provider instance
|
|
export_dir: Directory for CSV exports
|
|
main_symbol: Main trading symbol (ETH)
|
|
ref1_symbol: Reference symbol 1 (BTC)
|
|
ref2_symbol: Reference symbol 2 (SPX)
|
|
"""
|
|
self.data_provider = data_provider
|
|
self.export_dir = export_dir
|
|
self.main_symbol = main_symbol
|
|
self.ref1_symbol = ref1_symbol
|
|
self.ref2_symbol = ref2_symbol
|
|
self.ref3_symbol = ref3_symbol
|
|
self.export_format = export_format.upper() if isinstance(export_format, str) else "PIPE"
|
|
|
|
# Timeframes to export
|
|
self.timeframes = ['1s', '1m', '1h', '1d']
|
|
|
|
# File management
|
|
self.current_minute = None
|
|
self.current_filename = None
|
|
self.export_lock = threading.Lock()
|
|
|
|
# Running state
|
|
self.is_running = False
|
|
self.export_thread = None
|
|
|
|
# Create export directory
|
|
os.makedirs(self.export_dir, exist_ok=True)
|
|
|
|
logger.info(f"Text Data Exporter initialized - Export dir: {self.export_dir}")
|
|
logger.info(f"Symbols: MAIN={main_symbol}, REF1={ref1_symbol}, REF2={ref2_symbol}, REF3={ref3_symbol}")
|
|
|
|
def start(self):
|
|
"""Start the data export process"""
|
|
if self.is_running:
|
|
logger.warning("Text data exporter already running")
|
|
return
|
|
|
|
self.is_running = True
|
|
self.export_thread = threading.Thread(target=self._export_loop, daemon=True)
|
|
self.export_thread.start()
|
|
logger.info("Text data exporter started")
|
|
|
|
def stop(self):
|
|
"""Stop the data export process"""
|
|
self.is_running = False
|
|
if self.export_thread:
|
|
self.export_thread.join(timeout=5)
|
|
logger.info("Text data exporter stopped")
|
|
|
|
def _export_loop(self):
|
|
"""Main export loop - runs every second"""
|
|
while self.is_running:
|
|
try:
|
|
self._export_current_data()
|
|
time.sleep(1) # Export every second
|
|
except Exception as e:
|
|
logger.error(f"Error in export loop: {e}")
|
|
time.sleep(1)
|
|
|
|
def _export_current_data(self):
|
|
"""Export current market data to CSV"""
|
|
try:
|
|
current_time = datetime.now()
|
|
current_minute_key = current_time.strftime("%Y%m%d_%H%M")
|
|
|
|
# Check if we need a new file (new minute)
|
|
if self.current_minute != current_minute_key:
|
|
self.current_minute = current_minute_key
|
|
self.current_filename = f"market_data_{current_minute_key}.txt"
|
|
logger.info(f"Starting new export file: {self.current_filename}")
|
|
|
|
# Gather data for all symbols and timeframes
|
|
export_data = self._gather_export_data()
|
|
|
|
if export_data:
|
|
self._write_csv_file(export_data)
|
|
else:
|
|
logger.debug("No data available for export")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error exporting data: {e}")
|
|
|
|
def _gather_export_data(self) -> List[Dict[str, Any]]:
|
|
"""Gather market data for all symbols and timeframes"""
|
|
export_rows = []
|
|
|
|
if not self.data_provider:
|
|
return export_rows
|
|
|
|
symbols = [
|
|
("MAIN", self.main_symbol),
|
|
("REF1", self.ref1_symbol),
|
|
("REF2", self.ref2_symbol),
|
|
("REF3", self.ref3_symbol)
|
|
]
|
|
|
|
for symbol_type, symbol in symbols:
|
|
for timeframe in self.timeframes:
|
|
try:
|
|
# Get latest data for this symbol/timeframe
|
|
data_point = self._get_latest_data(symbol, timeframe)
|
|
if data_point:
|
|
export_rows.append({
|
|
'symbol_type': symbol_type,
|
|
'symbol': symbol,
|
|
'timeframe': timeframe,
|
|
'open': data_point.open,
|
|
'high': data_point.high,
|
|
'low': data_point.low,
|
|
'close': data_point.close,
|
|
'volume': data_point.volume,
|
|
'timestamp': data_point.timestamp
|
|
})
|
|
except Exception as e:
|
|
logger.debug(f"Error getting data for {symbol} {timeframe}: {e}")
|
|
|
|
return export_rows
|
|
|
|
def _get_latest_data(self, symbol: str, timeframe: str) -> Optional[MarketDataPoint]:
|
|
"""Get latest market data for symbol/timeframe"""
|
|
try:
|
|
candle = None
|
|
# Try direct method
|
|
if hasattr(self.data_provider, 'get_latest_candle'):
|
|
candle = self.data_provider.get_latest_candle(symbol, timeframe)
|
|
# Fallback to historical last row
|
|
if (not candle) and hasattr(self.data_provider, 'get_historical_data'):
|
|
try:
|
|
df = self.data_provider.get_historical_data(symbol, timeframe, limit=1)
|
|
if df is not None and not df.empty:
|
|
latest = df.iloc[-1]
|
|
ts = df.index[-1]
|
|
candle = {
|
|
'open': latest.get('open', 0),
|
|
'high': latest.get('high', 0),
|
|
'low': latest.get('low', 0),
|
|
'close': latest.get('close', 0),
|
|
'volume': latest.get('volume', 0),
|
|
'timestamp': ts.to_pydatetime() if hasattr(ts, 'to_pydatetime') else ts
|
|
}
|
|
except Exception as _ex:
|
|
logger.debug(f"hist fallback failed for {symbol} {timeframe}: {_ex}")
|
|
if not candle:
|
|
return None
|
|
|
|
# Convert to MarketDataPoint
|
|
return MarketDataPoint(
|
|
symbol=symbol,
|
|
timeframe=timeframe,
|
|
open=float(candle.get('open', 0)),
|
|
high=float(candle.get('high', 0)),
|
|
low=float(candle.get('low', 0)),
|
|
close=float(candle.get('close', 0)),
|
|
volume=float(candle.get('volume', 0)),
|
|
timestamp=candle.get('timestamp', datetime.now())
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error getting latest data for {symbol} {timeframe}: {e}")
|
|
return None
|
|
|
|
def _write_csv_file(self, export_data: List[Dict[str, Any]]):
|
|
"""Write data to TXT file in tab-separated format"""
|
|
if not export_data:
|
|
return
|
|
|
|
filepath = os.path.join(self.export_dir, self.current_filename)
|
|
|
|
with self.export_lock:
|
|
try:
|
|
# Group data by symbol type for organized output
|
|
grouped_data = self._group_data_by_symbol(export_data)
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as txtfile:
|
|
if self.export_format == 'TAB':
|
|
self._write_tab_format(txtfile, grouped_data)
|
|
else:
|
|
self._write_pipe_format(txtfile, grouped_data)
|
|
|
|
logger.debug(f"Exported {len(export_data)} data points to {filepath}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error writing TXT file {filepath}: {e}")
|
|
|
|
def _create_csv_header(self) -> List[str]:
|
|
"""Create CSV header based on specification"""
|
|
header = ['symbol']
|
|
|
|
# Add columns for each symbol type and timeframe
|
|
for symbol_type in ['MAIN', 'REF1', 'REF2']:
|
|
for timeframe in self.timeframes:
|
|
prefix = f"{symbol_type}_{timeframe}"
|
|
header.extend([
|
|
f"{prefix}_O", # Open
|
|
f"{prefix}_H", # High
|
|
f"{prefix}_L", # Low
|
|
f"{prefix}_C", # Close
|
|
f"{prefix}_V", # Volume
|
|
f"{prefix}_T" # Timestamp
|
|
])
|
|
|
|
return header
|
|
|
|
def _group_data_by_symbol(self, export_data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
|
"""Group data by symbol type and timeframe"""
|
|
grouped = {}
|
|
|
|
for data_point in export_data:
|
|
symbol_type = data_point['symbol_type']
|
|
timeframe = data_point['timeframe']
|
|
|
|
if symbol_type not in grouped:
|
|
grouped[symbol_type] = {}
|
|
|
|
grouped[symbol_type][timeframe] = data_point
|
|
|
|
return grouped
|
|
|
|
def _format_csv_rows(self, grouped_data: Dict[str, Dict[str, Dict[str, Any]]]) -> List[Dict[str, Any]]:
|
|
"""Format data into CSV rows"""
|
|
rows = []
|
|
|
|
# Create a single row with all data
|
|
row = {'symbol': f"{self.main_symbol.split('/')[0]} ({self.ref1_symbol.split('/')[0]}, {self.ref2_symbol})"}
|
|
|
|
for symbol_type in ['MAIN', 'REF1', 'REF2']:
|
|
symbol_data = grouped_data.get(symbol_type, {})
|
|
|
|
for timeframe in self.timeframes:
|
|
prefix = f"{symbol_type}_{timeframe}"
|
|
data_point = symbol_data.get(timeframe)
|
|
|
|
if data_point:
|
|
row[f"{prefix}_O"] = f"{data_point['open']:.6f}"
|
|
row[f"{prefix}_H"] = f"{data_point['high']:.6f}"
|
|
row[f"{prefix}_L"] = f"{data_point['low']:.6f}"
|
|
row[f"{prefix}_C"] = f"{data_point['close']:.6f}"
|
|
row[f"{prefix}_V"] = f"{data_point['volume']:.2f}"
|
|
row[f"{prefix}_T"] = data_point['timestamp'].strftime("%Y-%m-%d %H:%M:%S")
|
|
else:
|
|
# Empty values if no data
|
|
row[f"{prefix}_O"] = ""
|
|
row[f"{prefix}_H"] = ""
|
|
row[f"{prefix}_L"] = ""
|
|
row[f"{prefix}_C"] = ""
|
|
row[f"{prefix}_V"] = ""
|
|
row[f"{prefix}_T"] = ""
|
|
|
|
rows.append(row)
|
|
return rows
|
|
|
|
def _write_tab_format(self, txtfile, grouped_data: Dict[str, Dict[str, Dict[str, Any]]] ):
|
|
"""Write data in tab-separated format like readme.md sample"""
|
|
# Write header structure
|
|
txtfile.write("symbol\tMAIN SYMBOL (ETH)\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tREF1 (BTC)\t\t\t\t\t\t\tREF2 (SPX)\t\t\t\t\t\t\tREF3 (SOL)\n")
|
|
txtfile.write("timeframe\t1s\t\t\t\t\t\t1m\t\t\t\t\t\t1h\t\t\t\t\t\t1d\t\t\t\t\t\t1s\t\t\t\t\t\t1s\t\t\t\t\t\t1s\n")
|
|
txtfile.write("datapoint\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\tO\tH\tL\tC\tV\tTimestamp\n")
|
|
|
|
# Build up to 300 rows using historical 1s data when available
|
|
# Collect timestamps from MAIN 1s historical data
|
|
timestamps: List[Any] = []
|
|
hist_df = None
|
|
try:
|
|
if hasattr(self.data_provider, 'get_historical_data'):
|
|
hist_df = self.data_provider.get_historical_data(self.main_symbol, '1s', limit=300)
|
|
if hist_df is not None and not hist_df.empty:
|
|
timestamps = list(hist_df.index[-300:])
|
|
except Exception as _e:
|
|
logger.debug(f"hist 1s not available: {_e}")
|
|
if not timestamps:
|
|
timestamps = [datetime.utcnow()]
|
|
|
|
# Fetch snapshots for non-1s
|
|
main_data = grouped_data.get('MAIN', {})
|
|
main_1m = main_data.get('1m')
|
|
main_1h = main_data.get('1h')
|
|
main_1d = main_data.get('1d')
|
|
ref1_1s = grouped_data.get('REF1', {}).get('1s')
|
|
ref2_1s = grouped_data.get('REF2', {}).get('1s')
|
|
ref3_1s = grouped_data.get('REF3', {}).get('1s')
|
|
|
|
for ts in timestamps:
|
|
try:
|
|
ts_str = ts.strftime("%Y-%m-%dT%H:%M:%SZ") if hasattr(ts, 'strftime') else str(ts)
|
|
except Exception:
|
|
ts_str = ""
|
|
row_parts = [ts_str]
|
|
|
|
# MAIN 1s from hist row if possible
|
|
if hist_df is not None and ts in hist_df.index:
|
|
r = hist_df.loc[ts]
|
|
row_parts.extend([
|
|
f"{float(r.get('open', 0) or 0):.2f}",
|
|
f"{float(r.get('high', 0) or 0):.2f}",
|
|
f"{float(r.get('low', 0) or 0):.2f}",
|
|
f"{float(r.get('close', 0) or 0):.2f}",
|
|
f"{float(r.get('volume', 0) or 0):.1f}",
|
|
ts_str
|
|
])
|
|
else:
|
|
snap = main_data.get('1s')
|
|
if snap:
|
|
row_parts.extend([
|
|
f"{snap['open']:.2f}", f"{snap['high']:.2f}", f"{snap['low']:.2f}", f"{snap['close']:.2f}", f"{snap['volume']:.1f}", ts_str
|
|
])
|
|
else:
|
|
row_parts.extend(["0", "0", "0", "0", "0", ts_str])
|
|
|
|
# MAIN 1m/1h/1d snapshots
|
|
for snap in [main_1m, main_1h, main_1d]:
|
|
if snap:
|
|
row_parts.extend([
|
|
f"{snap['open']:.2f}", f"{snap['high']:.2f}", f"{snap['low']:.2f}", f"{snap['close']:.2f}", f"{snap['volume']:.1f}", snap['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
])
|
|
else:
|
|
row_parts.extend(["0", "0", "0", "0", "0", ts_str])
|
|
|
|
# REF1/REF2/REF3 1s snapshots
|
|
for snap in [ref1_1s, ref2_1s, ref3_1s]:
|
|
if snap:
|
|
row_parts.extend([
|
|
f"{snap['open']:.2f}", f"{snap['high']:.2f}", f"{snap['low']:.2f}", f"{snap['close']:.2f}", f"{snap['volume']:.1f}", snap['timestamp'].strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
])
|
|
else:
|
|
row_parts.extend(["0", "0", "0", "0", "0", ts_str])
|
|
|
|
txtfile.write("\t".join(row_parts) + "\n")
|
|
|
|
def _write_pipe_format(self, txtfile, grouped_data: Dict[str, Dict[str, Dict[str, Any]]] ):
|
|
"""Write data in pipe-delimited console-friendly grid."""
|
|
from io import StringIO
|
|
buffer = StringIO()
|
|
self._write_tab_format(buffer, grouped_data)
|
|
content = buffer.getvalue().splitlines()
|
|
if not content:
|
|
return
|
|
headers = [line.split('\t') for line in content[:3]]
|
|
data_rows = [line.split('\t') for line in content[3:]]
|
|
|
|
def to_pipe(row: List[str]) -> str:
|
|
return "|" + "|".join(str(col) for col in row) + "|"
|
|
|
|
# Render header rows with separators
|
|
for hdr in headers:
|
|
txtfile.write(to_pipe(hdr) + "\n")
|
|
txtfile.write("|" + "|".join(["-" * max(3, len(str(c))) for c in hdr]) + "|\n")
|
|
|
|
# Render data
|
|
for row in data_rows:
|
|
txtfile.write(to_pipe(row) + "\n")
|
|
|
|
def get_current_filename(self) -> Optional[str]:
|
|
"""Get current export filename"""
|
|
return self.current_filename
|
|
|
|
def get_export_stats(self) -> Dict[str, Any]:
|
|
"""Get export statistics"""
|
|
stats = {
|
|
'is_running': self.is_running,
|
|
'export_dir': self.export_dir,
|
|
'current_filename': self.current_filename,
|
|
'symbols': {
|
|
'main': self.main_symbol,
|
|
'ref1': self.ref1_symbol,
|
|
'ref2': self.ref2_symbol
|
|
},
|
|
'timeframes': self.timeframes
|
|
}
|
|
|
|
# Add file count
|
|
try:
|
|
files = [f for f in os.listdir(self.export_dir) if f.endswith('.txt')]
|
|
stats['total_files'] = len(files)
|
|
except:
|
|
stats['total_files'] = 0
|
|
|
|
return stats
|
|
|
|
# Convenience function for integration
|
|
def create_text_exporter(data_provider=None, **kwargs) -> TextDataExporter:
|
|
"""Create and return a TextDataExporter instance"""
|
|
return TextDataExporter(data_provider=data_provider, **kwargs) |