#!/usr/bin/env python3 """ LLM Proxy Model - Interface for LLM-based trading signals Sends market data to LLM endpoint and parses responses for trade signals """ import json import logging import requests import threading import time from datetime import datetime from typing import Dict, List, Optional, Any, Tuple from dataclasses import dataclass import os logger = logging.getLogger(__name__) @dataclass class LLMTradeSignal: """Trade signal from LLM""" symbol: str action: str # 'BUY', 'SELL', 'HOLD' confidence: float # 0.0 to 1.0 reasoning: str price_target: Optional[float] = None stop_loss: Optional[float] = None timestamp: Optional[datetime] = None @dataclass class LLMConfig: """LLM configuration""" base_url: str = "http://localhost:1234" model: str = "openai/gpt-oss-20b" temperature: float = 0.7 max_tokens: int = -1 timeout: int = 30 api_key: Optional[str] = None class LLMProxy: """ LLM Proxy for trading signal generation Features: - Configurable LLM endpoint and model - Processes market data from TextDataExporter files - Generates structured trading signals - Thread-safe operations - Error handling and retry logic """ def __init__(self, config: Optional[LLMConfig] = None, data_dir: str = "NN/training/samples/txt"): """ Initialize LLM proxy Args: config: LLM configuration data_dir: Directory to watch for market data files """ self.config = config or LLMConfig() self.data_dir = data_dir # Processing state self.is_running = False self.processing_thread = None self.processed_files = set() # Signal storage self.latest_signals: Dict[str, LLMTradeSignal] = {} self.signal_history: List[LLMTradeSignal] = [] self.lock = threading.Lock() # System prompt for trading self.system_prompt = """You are an expert cryptocurrency trading analyst. You will receive market data for ETH (main symbol) with reference data for BTC and SPX. Analyze the multi-timeframe data (1s, 1m, 1h, 1d) and provide trading recommendations. Respond ONLY with valid JSON in this format: { "action": "BUY|SELL|HOLD", "confidence": 0.0-1.0, "reasoning": "brief analysis", "price_target": number_or_null, "stop_loss": number_or_null } Consider market correlations, timeframe divergences, and risk management. """ logger.info(f"LLM Proxy initialized - Model: {self.config.model}") logger.info(f"Watching directory: {self.data_dir}") def start(self): """Start LLM processing""" if self.is_running: logger.warning("LLM proxy already running") return self.is_running = True self.processing_thread = threading.Thread(target=self._processing_loop, daemon=True) self.processing_thread.start() logger.info("LLM proxy started") def stop(self): """Stop LLM processing""" self.is_running = False if self.processing_thread: self.processing_thread.join(timeout=5) logger.info("LLM proxy stopped") def _processing_loop(self): """Main processing loop - checks for new files""" while self.is_running: try: self._check_for_new_files() time.sleep(5) # Check every 5 seconds except Exception as e: logger.error(f"Error in LLM processing loop: {e}") time.sleep(5) def _check_for_new_files(self): """Check for new market data files""" try: if not os.path.exists(self.data_dir): return txt_files = [f for f in os.listdir(self.data_dir) if f.endswith('.txt') and f.startswith('market_data_')] for filename in txt_files: if filename not in self.processed_files: filepath = os.path.join(self.data_dir, filename) self._process_file(filepath, filename) self.processed_files.add(filename) except Exception as e: logger.error(f"Error checking for new files: {e}") def _process_file(self, filepath: str, filename: str): """Process a market data file""" try: logger.info(f"Processing market data file: {filename}") # Read and parse market data market_data = self._parse_market_data(filepath) if not market_data: logger.warning(f"No valid market data in {filename}") return # Generate LLM prompt prompt = self._create_trading_prompt(market_data) # Send to LLM response = self._query_llm(prompt) if not response: logger.warning(f"No response from LLM for {filename}") return # Parse response signal = self._parse_llm_response(response, market_data) if signal: with self.lock: self.latest_signals['ETH'] = signal self.signal_history.append(signal) # Keep only last 100 signals if len(self.signal_history) > 100: self.signal_history = self.signal_history[-100:] logger.info(f"Generated signal: {signal.action} ({signal.confidence:.2f}) - {signal.reasoning}") except Exception as e: logger.error(f"Error processing file {filename}: {e}") def _parse_market_data(self, filepath: str) -> Optional[Dict[str, Any]]: """Parse market data from text file""" try: with open(filepath, 'r', encoding='utf-8') as f: lines = f.readlines() if len(lines) < 4: # Need header + data return None # Find data line (skip headers) data_line = None for line in lines[3:]: # Skip the 3 header lines if line.strip() and not line.startswith('symbol'): data_line = line.strip() break if not data_line: return None # Parse tab-separated data parts = data_line.split('\t') if len(parts) < 25: # Need minimum data return None # Extract structured data parsed_data = { 'timestamp': parts[0], 'eth_1s': self._extract_ohlcv(parts[1:7]), 'eth_1m': self._extract_ohlcv(parts[7:13]), 'eth_1h': self._extract_ohlcv(parts[13:19]), 'eth_1d': self._extract_ohlcv(parts[19:25]), 'btc_1s': self._extract_ohlcv(parts[25:31]) if len(parts) > 25 else None, 'spx_1s': self._extract_ohlcv(parts[31:37]) if len(parts) > 31 else None } return parsed_data except Exception as e: logger.error(f"Error parsing market data: {e}") return None def _extract_ohlcv(self, data_parts: List[str]) -> Dict[str, float]: """Extract OHLCV data from parts""" try: return { 'open': float(data_parts[0]) if data_parts[0] != '0' else 0.0, 'high': float(data_parts[1]) if data_parts[1] != '0' else 0.0, 'low': float(data_parts[2]) if data_parts[2] != '0' else 0.0, 'close': float(data_parts[3]) if data_parts[3] != '0' else 0.0, 'volume': float(data_parts[4]) if data_parts[4] != '0' else 0.0, 'timestamp': data_parts[5] } except (ValueError, IndexError): return {'open': 0.0, 'high': 0.0, 'low': 0.0, 'close': 0.0, 'volume': 0.0, 'timestamp': ''} def _create_trading_prompt(self, market_data: Dict[str, Any]) -> str: """Create trading prompt from market data""" prompt = f"""Market Data Analysis for ETH/USDT: Timestamp: {market_data['timestamp']} ETH Multi-timeframe Data: 1s: O:{market_data['eth_1s']['open']:.2f} H:{market_data['eth_1s']['high']:.2f} L:{market_data['eth_1s']['low']:.2f} C:{market_data['eth_1s']['close']:.2f} V:{market_data['eth_1s']['volume']:.1f} 1m: O:{market_data['eth_1m']['open']:.2f} H:{market_data['eth_1m']['high']:.2f} L:{market_data['eth_1m']['low']:.2f} C:{market_data['eth_1m']['close']:.2f} V:{market_data['eth_1m']['volume']:.1f} 1h: O:{market_data['eth_1h']['open']:.2f} H:{market_data['eth_1h']['high']:.2f} L:{market_data['eth_1h']['low']:.2f} C:{market_data['eth_1h']['close']:.2f} V:{market_data['eth_1h']['volume']:.1f} 1d: O:{market_data['eth_1d']['open']:.2f} H:{market_data['eth_1d']['high']:.2f} L:{market_data['eth_1d']['low']:.2f} C:{market_data['eth_1d']['close']:.2f} V:{market_data['eth_1d']['volume']:.1f} """ if market_data.get('btc_1s'): prompt += f"\nBTC Reference (1s): O:{market_data['btc_1s']['open']:.2f} H:{market_data['btc_1s']['high']:.2f} L:{market_data['btc_1s']['low']:.2f} C:{market_data['btc_1s']['close']:.2f} V:{market_data['btc_1s']['volume']:.1f}" if market_data.get('spx_1s'): prompt += f"\nSPX Reference (1s): O:{market_data['spx_1s']['open']:.2f} H:{market_data['spx_1s']['high']:.2f} L:{market_data['spx_1s']['low']:.2f} C:{market_data['spx_1s']['close']:.2f}" prompt += "\n\nProvide trading recommendation based on this multi-timeframe analysis." return prompt def _query_llm(self, prompt: str) -> Optional[str]: """Send query to LLM endpoint""" try: url = f"{self.config.base_url}/v1/chat/completions" headers = { "Content-Type": "application/json" } if self.config.api_key: headers["Authorization"] = f"Bearer {self.config.api_key}" payload = { "model": self.config.model, "messages": [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": prompt} ], "temperature": self.config.temperature, "max_tokens": self.config.max_tokens, "stream": False } response = requests.post( url, headers=headers, data=json.dumps(payload), timeout=self.config.timeout ) if response.status_code == 200: result = response.json() if 'choices' in result and len(result['choices']) > 0: return result['choices'][0]['message']['content'] else: logger.error(f"LLM API error: {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"Error querying LLM: {e}") return None def _parse_llm_response(self, response: str, market_data: Dict[str, Any]) -> Optional[LLMTradeSignal]: """Parse LLM response into trade signal""" try: # Try to extract JSON from response response = response.strip() if response.startswith('```json'): response = response[7:] if response.endswith('```'): response = response[:-3] # Parse JSON data = json.loads(response) # Validate required fields if 'action' not in data or 'confidence' not in data: logger.warning("LLM response missing required fields") return None # Create signal signal = LLMTradeSignal( symbol='ETH/USDT', action=data['action'].upper(), confidence=float(data['confidence']), reasoning=data.get('reasoning', ''), price_target=data.get('price_target'), stop_loss=data.get('stop_loss'), timestamp=datetime.now() ) # Validate action if signal.action not in ['BUY', 'SELL', 'HOLD']: logger.warning(f"Invalid action: {signal.action}") return None # Validate confidence signal.confidence = max(0.0, min(1.0, signal.confidence)) return signal except Exception as e: logger.error(f"Error parsing LLM response: {e}") logger.debug(f"Response was: {response}") return None def get_latest_signal(self, symbol: str = 'ETH') -> Optional[LLMTradeSignal]: """Get latest trading signal for symbol""" with self.lock: return self.latest_signals.get(symbol) def get_signal_history(self, limit: int = 10) -> List[LLMTradeSignal]: """Get recent signal history""" with self.lock: return self.signal_history[-limit:] if self.signal_history else [] def update_config(self, config: LLMConfig): """Update LLM configuration""" self.config = config logger.info(f"LLM config updated - Model: {self.config.model}, Base URL: {self.config.base_url}") def get_status(self) -> Dict[str, Any]: """Get LLM proxy status""" with self.lock: return { 'is_running': self.is_running, 'config': { 'base_url': self.config.base_url, 'model': self.config.model, 'temperature': self.config.temperature }, 'processed_files': len(self.processed_files), 'total_signals': len(self.signal_history), 'latest_signals': {k: { 'action': v.action, 'confidence': v.confidence, 'timestamp': v.timestamp.isoformat() if v.timestamp else None } for k, v in self.latest_signals.items()} } # Convenience functions def create_llm_proxy(config: Optional[LLMConfig] = None, **kwargs) -> LLMProxy: """Create LLM proxy instance""" return LLMProxy(config=config, **kwargs) def create_llm_config(base_url: str = "http://localhost:1234", model: str = "openai/gpt-oss-20b", **kwargs) -> LLMConfig: """Create LLM configuration""" return LLMConfig(base_url=base_url, model=model, **kwargs)