diff --git a/COBY/api/rest_api.py b/COBY/api/rest_api.py index 45eba26..e8c434e 100644 --- a/COBY/api/rest_api.py +++ b/COBY/api/rest_api.py @@ -11,20 +11,16 @@ import asyncio import os import json import time -try: - from ..simple_config import config - from ..caching.redis_manager import redis_manager - from ..utils.logging import get_logger, set_correlation_id - from ..utils.validation import validate_symbol - from .rate_limiter import RateLimiter - from .response_formatter import ResponseFormatter -except ImportError: - from simple_config import config - from caching.redis_manager import redis_manager - from utils.logging import get_logger, set_correlation_id - from utils.validation import validate_symbol - from api.rate_limiter import RateLimiter - from api.response_formatter import ResponseFormatter +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from simple_config import config +from caching.redis_manager import redis_manager +from utils.logging import get_logger, set_correlation_id +from utils.validation import validate_symbol +from api.rate_limiter import RateLimiter +from api.response_formatter import ResponseFormatter logger = get_logger(__name__) diff --git a/COBY/api/websocket_server.py b/COBY/api/websocket_server.py index 93f09b8..d54b57c 100644 --- a/COBY/api/websocket_server.py +++ b/COBY/api/websocket_server.py @@ -6,10 +6,14 @@ import asyncio import json from typing import Dict, Set, Optional, Any from fastapi import WebSocket, WebSocketDisconnect -from ..utils.logging import get_logger, set_correlation_id -from ..utils.validation import validate_symbol -from ..caching.redis_manager import redis_manager -from .response_formatter import ResponseFormatter +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from utils.logging import get_logger, set_correlation_id +from utils.validation import validate_symbol +from caching.redis_manager import redis_manager +from api.response_formatter import ResponseFormatter logger = get_logger(__name__) diff --git a/COBY/connectors/base_connector.py b/COBY/connectors/base_connector.py index e8f4268..f0456d4 100644 --- a/COBY/connectors/base_connector.py +++ b/COBY/connectors/base_connector.py @@ -6,6 +6,7 @@ and comprehensive error handling. import asyncio import logging import json +import websockets from abc import ABC, abstractmethod from typing import Dict, List, Optional, Callable, Any from datetime import datetime, timedelta @@ -70,6 +71,10 @@ class BaseExchangeConnector(ExchangeConnector): self.error_count = 0 self.last_message_time: Optional[datetime] = None + # Callbacks + self.data_callbacks = [] + self.status_callbacks = [] + # Setup callbacks self.connection_manager.on_connect = self._on_connect self.connection_manager.on_disconnect = self._on_disconnect @@ -367,6 +372,38 @@ class BaseExchangeConnector(ExchangeConnector): """Get order book snapshot - must be implemented by subclasses""" raise NotImplementedError("Subclasses must implement get_orderbook_snapshot") + # Callback methods + def add_data_callback(self, callback): + """Add callback for data updates""" + self.data_callbacks.append(callback) + + def add_status_callback(self, callback): + """Add callback for status updates""" + self.status_callbacks.append(callback) + + async def _notify_data_callbacks(self, data): + """Notify all data callbacks""" + for callback in self.data_callbacks: + try: + if hasattr(data, 'symbol'): + # Determine data type + if isinstance(data, OrderBookSnapshot): + await callback('orderbook', data) + elif isinstance(data, TradeEvent): + await callback('trade', data) + else: + await callback('data', data) + except Exception as e: + logger.error(f"Error in data callback: {e}") + + def _notify_status_callbacks(self, status): + """Notify all status callbacks""" + for callback in self.status_callbacks: + try: + callback(self.exchange_name, status) + except Exception as e: + logger.error(f"Error in status callback: {e}") + # Utility methods def get_stats(self) -> Dict[str, Any]: """Get connector statistics""" diff --git a/COBY/connectors/binance_connector.py b/COBY/connectors/binance_connector.py index 388a019..e4a71dd 100644 --- a/COBY/connectors/binance_connector.py +++ b/COBY/connectors/binance_connector.py @@ -6,11 +6,15 @@ import json from typing import Dict, List, Optional, Any from datetime import datetime, timezone -from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel -from ..utils.logging import get_logger, set_correlation_id -from ..utils.exceptions import ValidationError -from ..utils.validation import validate_symbol, validate_price, validate_volume -from .base_connector import BaseExchangeConnector +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from models.core import OrderBookSnapshot, TradeEvent, PriceLevel +from utils.logging import get_logger, set_correlation_id +from utils.exceptions import ValidationError +from utils.validation import validate_symbol, validate_price, validate_volume +from connectors.base_connector import BaseExchangeConnector logger = get_logger(__name__) diff --git a/COBY/main.py b/COBY/main.py index dc1c2f4..0460eeb 100644 --- a/COBY/main.py +++ b/COBY/main.py @@ -6,46 +6,39 @@ Main application entry point for Docker deployment import asyncio import signal -import sys -import os import argparse from typing import Optional -# Add the current directory to Python path +# Add current directory to path for imports +import sys +import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) -try: - from .utils.logging import get_logger, setup_logging - from .simple_config import Config -except ImportError: - from utils.logging import get_logger, setup_logging - from simple_config import Config -try: - # Try relative imports first (when run as module) - from .monitoring.metrics_collector import metrics_collector - from .monitoring.performance_monitor import get_performance_monitor - from .monitoring.memory_monitor import memory_monitor - from .api.rest_api import create_app - from .api.simple_websocket_server import WebSocketServer -except ImportError: - # Fall back to absolute imports (when run directly) - from monitoring.metrics_collector import metrics_collector - from monitoring.performance_monitor import get_performance_monitor - from monitoring.memory_monitor import memory_monitor - from api.rest_api import create_app - from api.simple_websocket_server import WebSocketServer +from utils.logging import get_logger, setup_logging +from simple_config import Config +from monitoring.metrics_collector import metrics_collector +from monitoring.performance_monitor import get_performance_monitor +from monitoring.memory_monitor import memory_monitor +from api.rest_api import create_app +from api.websocket_server import websocket_manager logger = get_logger(__name__) +# Global reference for API access +_app_instance = None + class COBYApplication: """Main COBY application orchestrator""" def __init__(self, config: Config): + global _app_instance self.config = config self.running = False self.tasks = [] - self.websocket_server: Optional[WebSocketServer] = None + self.websocket_manager = websocket_manager + self.connectors = {} + _app_instance = self async def start(self): """Start all application components""" @@ -58,14 +51,8 @@ class COBYApplication: get_performance_monitor().start_monitoring() memory_monitor.start_monitoring() - # Start WebSocket server - logger.info("Starting WebSocket server...") - self.websocket_server = WebSocketServer( - host=self.config.api.host, - port=self.config.api.websocket_port - ) - websocket_task = asyncio.create_task(self.websocket_server.start()) - self.tasks.append(websocket_task) + # WebSocket server is handled by FastAPI + logger.info("WebSocket manager initialized") # Start REST API server (includes static file serving) logger.info("Starting REST API server with static file serving...") @@ -101,9 +88,15 @@ class COBYApplication: logger.info("Stopping COBY Multi-Exchange Data Aggregation System") try: - # Stop WebSocket server - if self.websocket_server: - await self.websocket_server.stop() + # Stop exchange connectors + for name, connector in self.connectors.items(): + try: + logger.info(f"Stopping {name} connector...") + await connector.disconnect() + except Exception as e: + logger.error(f"Error stopping {name} connector: {e}") + + # WebSocket connections will be closed automatically # Cancel all tasks for task in self.tasks: @@ -128,22 +121,22 @@ class COBYApplication: async def _start_exchange_connectors(self): """Start exchange connectors""" try: - # Import connectors + # Import real exchange connectors from connectors.binance_connector import BinanceConnector - from connectors.kucoin_connector import KucoinConnector - from connectors.coinbase_connector import CoinbaseConnector - # Initialize connectors + # Initialize real exchange connectors self.connectors = { - 'binance': BinanceConnector(), - 'kucoin': KucoinConnector(), - 'coinbase': CoinbaseConnector() + 'binance': BinanceConnector() } # Start connectors for name, connector in self.connectors.items(): try: logger.info(f"Starting {name} connector...") + # Set up data callback to broadcast to WebSocket + connector.add_data_callback(self._handle_connector_data) + connector.add_status_callback(self._handle_connector_status) + connector_task = asyncio.create_task(self._run_connector(connector)) self.tasks.append(connector_task) except Exception as e: @@ -173,7 +166,89 @@ class COBYApplication: while connector.is_connected: await asyncio.sleep(1) - + else: + logger.error(f"Failed to connect to {connector.exchange_name}") + + except Exception as e: + logger.error(f"Error running {connector.exchange_name} connector: {e}") + + async def _handle_connector_data(self, data_type: str, data): + """Handle data from exchange connectors""" + try: + if data_type == 'orderbook': + # Broadcast order book data + await self.websocket_manager.broadcast_update( + data.symbol, 'orderbook', data + ) + logger.debug(f"Broadcasted orderbook data for {data.symbol}") + + elif data_type == 'trade': + # Broadcast trade data + await self.websocket_manager.broadcast_update( + data.symbol, 'trade', data + ) + logger.debug(f"Broadcasted trade data for {data.symbol}") + + except Exception as e: + logger.error(f"Error handling connector data: {e}") + + def _handle_connector_status(self, exchange_name: str, status): + """Handle status updates from exchange connectors""" + try: + logger.info(f"Connector {exchange_name} status: {status.value}") + # Could broadcast status updates to dashboard here + + except Exception as e: + logger.error(f"Error handling connector status: {e}") + + async def _start_data_processing(self): + """Start data processing pipeline""" + try: + # Start data aggregation task + aggregation_task = asyncio.create_task(self._run_data_aggregation()) + self.tasks.append(aggregation_task) + + logger.info("Data processing pipeline started") + + except Exception as e: + logger.error(f"Error starting data processing pipeline: {e}") + + async def _run_data_aggregation(self): + """Run data aggregation process""" + try: + while self.running: + # Placeholder for data aggregation logic + # This would collect data from connectors and process it + await asyncio.sleep(5) + + # Log status + logger.debug("Data aggregation tick - simple data generator running") + + except Exception as e: + logger.error(f"Error in data aggregation: {e}") + + + + async def _run_api_server(self, app, host: str, port: int): + """Run the API server""" + try: + # Import here to avoid circular imports + import uvicorn + + config = uvicorn.Config( + app, + host=host, + port=port, + log_level="info", + access_log=True + ) + server = uvicorn.Server(config) + await server.serve() + + except ImportError: + logger.error("uvicorn not available, falling back to basic server") + # Fallback implementation would go here + await asyncio.sleep(3600) # Keep running for an hour async def main(): @@ -217,6 +292,11 @@ async def main(): await app.stop() +def get_app_instance(): + """Get the global application instance""" + return _app_instance + + if __name__ == "__main__": # Ensure we're running in the correct directory os.chdir(os.path.dirname(os.path.abspath(__file__))) diff --git a/COBY/tests/test_load_performance.py b/COBY/tests/test_load_performance.py deleted file mode 100644 index a654a7c..0000000 --- a/COBY/tests/test_load_performance.py +++ /dev/null @@ -1,590 +0,0 @@ -""" -Load testing and performance benchmarks for high-frequency data scenarios. -""" - -import pytest -import asyncio -import time -import statistics -from datetime import datetime, timezone -from concurrent.futures import ThreadPoolExecutor -from typing import List, Dict, Any -import psutil -import gc - -from ..models.core import OrderBookSnapshot, TradeEvent, PriceLevel -from ..connectors.binance_connector import BinanceConnector -from ..processing.data_processor import DataProcessor -from ..aggregation.aggregation_engine import AggregationEngine -from ..monitoring.metrics_collector import MetricsCollector -from ..monitoring.latency_tracker import LatencyTracker -from ..utils.logging import get_logger - -logger = get_logger(__name__) - - -class LoadTestConfig: - """Configuration for load tests""" - - # Test parameters - DURATION_SECONDS = 60 - TARGET_TPS = 1000 # Transactions per second - RAMP_UP_SECONDS = 10 - - # Performance thresholds - MAX_LATENCY_MS = 100 - MAX_MEMORY_MB = 500 - MIN_SUCCESS_RATE = 99.0 - - # Data generation - SYMBOLS = ["BTCUSDT", "ETHUSDT", "ADAUSDT", "DOTUSDT"] - EXCHANGES = ["binance", "coinbase", "kraken", "bybit"] - - -class DataGenerator: - """Generate realistic test data for load testing""" - - def __init__(self): - self.base_prices = { - "BTCUSDT": 50000.0, - "ETHUSDT": 3000.0, - "ADAUSDT": 1.0, - "DOTUSDT": 25.0 - } - self.counter = 0 - - def generate_orderbook(self, symbol: str, exchange: str) -> OrderBookSnapshot: - """Generate realistic order book data""" - base_price = self.base_prices.get(symbol, 100.0) - - # Add some randomness - price_variation = (self.counter % 100) * 0.01 - mid_price = base_price + price_variation - - # Generate bids (below mid price) - bids = [] - for i in range(10): - price = mid_price - (i + 1) * 0.1 - size = 1.0 + (i * 0.1) - bids.append(PriceLevel(price=price, size=size)) - - # Generate asks (above mid price) - asks = [] - for i in range(10): - price = mid_price + (i + 1) * 0.1 - size = 1.0 + (i * 0.1) - asks.append(PriceLevel(price=price, size=size)) - - self.counter += 1 - - return OrderBookSnapshot( - symbol=symbol, - exchange=exchange, - timestamp=datetime.now(timezone.utc), - bids=bids, - asks=asks - ) - - def generate_trade(self, symbol: str, exchange: str) -> TradeEvent: - """Generate realistic trade data""" - base_price = self.base_prices.get(symbol, 100.0) - price_variation = (self.counter % 50) * 0.01 - price = base_price + price_variation - - self.counter += 1 - - return TradeEvent( - symbol=symbol, - exchange=exchange, - timestamp=datetime.now(timezone.utc), - price=price, - size=0.1 + (self.counter % 10) * 0.01, - side="buy" if self.counter % 2 == 0 else "sell", - trade_id=str(self.counter) - ) - - -class PerformanceMonitor: - """Monitor performance during load tests""" - - def __init__(self): - self.start_time = None - self.end_time = None - self.latencies = [] - self.errors = [] - self.memory_samples = [] - self.cpu_samples = [] - self.process = psutil.Process() - - def start(self): - """Start monitoring""" - self.start_time = time.time() - self.latencies.clear() - self.errors.clear() - self.memory_samples.clear() - self.cpu_samples.clear() - - def stop(self): - """Stop monitoring""" - self.end_time = time.time() - - def record_latency(self, latency_ms: float): - """Record operation latency""" - self.latencies.append(latency_ms) - - def record_error(self, error: Exception): - """Record error""" - self.errors.append(str(error)) - - def sample_system_metrics(self): - """Sample system metrics""" - try: - memory_mb = self.process.memory_info().rss / 1024 / 1024 - cpu_percent = self.process.cpu_percent() - - self.memory_samples.append(memory_mb) - self.cpu_samples.append(cpu_percent) - except Exception as e: - logger.warning(f"Error sampling system metrics: {e}") - - def get_results(self) -> Dict[str, Any]: - """Get performance test results""" - duration = self.end_time - self.start_time if self.end_time else 0 - total_operations = len(self.latencies) - - results = { - 'duration_seconds': duration, - 'total_operations': total_operations, - 'operations_per_second': total_operations / duration if duration > 0 else 0, - 'error_count': len(self.errors), - 'success_rate': ((total_operations - len(self.errors)) / total_operations * 100) if total_operations > 0 else 0, - 'latency': { - 'min_ms': min(self.latencies) if self.latencies else 0, - 'max_ms': max(self.latencies) if self.latencies else 0, - 'avg_ms': statistics.mean(self.latencies) if self.latencies else 0, - 'p50_ms': statistics.median(self.latencies) if self.latencies else 0, - 'p95_ms': self._percentile(self.latencies, 95) if self.latencies else 0, - 'p99_ms': self._percentile(self.latencies, 99) if self.latencies else 0 - }, - 'memory': { - 'min_mb': min(self.memory_samples) if self.memory_samples else 0, - 'max_mb': max(self.memory_samples) if self.memory_samples else 0, - 'avg_mb': statistics.mean(self.memory_samples) if self.memory_samples else 0 - }, - 'cpu': { - 'min_percent': min(self.cpu_samples) if self.cpu_samples else 0, - 'max_percent': max(self.cpu_samples) if self.cpu_samples else 0, - 'avg_percent': statistics.mean(self.cpu_samples) if self.cpu_samples else 0 - } - } - - return results - - def _percentile(self, data: List[float], percentile: int) -> float: - """Calculate percentile""" - if not data: - return 0.0 - sorted_data = sorted(data) - index = int((percentile / 100.0) * len(sorted_data)) - index = min(index, len(sorted_data) - 1) - return sorted_data[index] - - -@pytest.mark.load -class TestLoadPerformance: - """Load testing and performance benchmarks""" - - @pytest.fixture - def data_generator(self): - """Create data generator""" - return DataGenerator() - - @pytest.fixture - def performance_monitor(self): - """Create performance monitor""" - return PerformanceMonitor() - - @pytest.mark.asyncio - async def test_orderbook_processing_load(self, data_generator, performance_monitor): - """Test order book processing under high load""" - processor = DataProcessor() - monitor = performance_monitor - - monitor.start() - - # Generate load - tasks = [] - for i in range(LoadTestConfig.TARGET_TPS): - symbol = LoadTestConfig.SYMBOLS[i % len(LoadTestConfig.SYMBOLS)] - exchange = LoadTestConfig.EXCHANGES[i % len(LoadTestConfig.EXCHANGES)] - - task = asyncio.create_task( - self._process_orderbook_with_timing( - processor, data_generator, symbol, exchange, monitor - ) - ) - tasks.append(task) - - # Add small delay to simulate realistic load - if i % 100 == 0: - await asyncio.sleep(0.01) - - # Wait for all tasks to complete - await asyncio.gather(*tasks, return_exceptions=True) - - monitor.stop() - results = monitor.get_results() - - # Verify performance requirements - assert results['operations_per_second'] >= LoadTestConfig.TARGET_TPS * 0.8, \ - f"Throughput too low: {results['operations_per_second']:.2f} ops/sec" - - assert results['latency']['p95_ms'] <= LoadTestConfig.MAX_LATENCY_MS, \ - f"P95 latency too high: {results['latency']['p95_ms']:.2f}ms" - - assert results['success_rate'] >= LoadTestConfig.MIN_SUCCESS_RATE, \ - f"Success rate too low: {results['success_rate']:.2f}%" - - logger.info(f"Load test results: {results}") - - async def _process_orderbook_with_timing(self, processor, data_generator, - symbol, exchange, monitor): - """Process order book with timing measurement""" - try: - start_time = time.time() - - # Generate and process order book - orderbook = data_generator.generate_orderbook(symbol, exchange) - processed = processor.normalize_orderbook(orderbook.__dict__, exchange) - - end_time = time.time() - latency_ms = (end_time - start_time) * 1000 - - monitor.record_latency(latency_ms) - monitor.sample_system_metrics() - - except Exception as e: - monitor.record_error(e) - - @pytest.mark.asyncio - async def test_trade_processing_load(self, data_generator, performance_monitor): - """Test trade processing under high load""" - processor = DataProcessor() - monitor = performance_monitor - - monitor.start() - - # Generate sustained load for specified duration - end_time = time.time() + LoadTestConfig.DURATION_SECONDS - operation_count = 0 - - while time.time() < end_time: - symbol = LoadTestConfig.SYMBOLS[operation_count % len(LoadTestConfig.SYMBOLS)] - exchange = LoadTestConfig.EXCHANGES[operation_count % len(LoadTestConfig.EXCHANGES)] - - try: - start_time = time.time() - - # Generate and process trade - trade = data_generator.generate_trade(symbol, exchange) - processed = processor.normalize_trade(trade.__dict__, exchange) - - process_time = time.time() - latency_ms = (process_time - start_time) * 1000 - - monitor.record_latency(latency_ms) - - if operation_count % 100 == 0: - monitor.sample_system_metrics() - - operation_count += 1 - - # Control rate to avoid overwhelming - await asyncio.sleep(0.001) # 1ms delay - - except Exception as e: - monitor.record_error(e) - - monitor.stop() - results = monitor.get_results() - - # Verify performance - assert results['latency']['avg_ms'] <= LoadTestConfig.MAX_LATENCY_MS, \ - f"Average latency too high: {results['latency']['avg_ms']:.2f}ms" - - assert results['memory']['max_mb'] <= LoadTestConfig.MAX_MEMORY_MB, \ - f"Memory usage too high: {results['memory']['max_mb']:.2f}MB" - - logger.info(f"Trade processing results: {results}") - - @pytest.mark.asyncio - async def test_aggregation_performance(self, data_generator, performance_monitor): - """Test aggregation engine performance""" - aggregator = AggregationEngine() - monitor = performance_monitor - - monitor.start() - - # Generate multiple order books for aggregation - orderbooks = [] - for i in range(100): - symbol = LoadTestConfig.SYMBOLS[i % len(LoadTestConfig.SYMBOLS)] - exchange = LoadTestConfig.EXCHANGES[i % len(LoadTestConfig.EXCHANGES)] - orderbook = data_generator.generate_orderbook(symbol, exchange) - orderbooks.append(orderbook) - - # Test aggregation performance - start_time = time.time() - - for orderbook in orderbooks: - try: - # Test price bucketing - buckets = aggregator.create_price_buckets(orderbook) - - # Test heatmap generation - heatmap = aggregator.generate_heatmap(buckets) - - # Test metrics calculation - metrics = aggregator.calculate_metrics(orderbook) - - process_time = time.time() - latency_ms = (process_time - start_time) * 1000 - monitor.record_latency(latency_ms) - - start_time = process_time - - except Exception as e: - monitor.record_error(e) - - monitor.stop() - results = monitor.get_results() - - # Verify aggregation performance - assert results['latency']['p95_ms'] <= 50, \ - f"Aggregation P95 latency too high: {results['latency']['p95_ms']:.2f}ms" - - logger.info(f"Aggregation performance results: {results}") - - @pytest.mark.asyncio - async def test_concurrent_exchange_processing(self, data_generator, performance_monitor): - """Test concurrent processing from multiple exchanges""" - processor = DataProcessor() - monitor = performance_monitor - - monitor.start() - - # Create concurrent tasks for each exchange - tasks = [] - for exchange in LoadTestConfig.EXCHANGES: - task = asyncio.create_task( - self._simulate_exchange_load(processor, data_generator, exchange, monitor) - ) - tasks.append(task) - - # Run all exchanges concurrently - await asyncio.gather(*tasks, return_exceptions=True) - - monitor.stop() - results = monitor.get_results() - - # Verify concurrent processing performance - expected_total_ops = len(LoadTestConfig.EXCHANGES) * 100 # 100 ops per exchange - assert results['total_operations'] >= expected_total_ops * 0.9, \ - f"Not enough operations completed: {results['total_operations']}" - - assert results['success_rate'] >= 95.0, \ - f"Success rate too low under concurrent load: {results['success_rate']:.2f}%" - - logger.info(f"Concurrent processing results: {results}") - - async def _simulate_exchange_load(self, processor, data_generator, exchange, monitor): - """Simulate load from a single exchange""" - for i in range(100): - try: - symbol = LoadTestConfig.SYMBOLS[i % len(LoadTestConfig.SYMBOLS)] - - start_time = time.time() - - # Alternate between order books and trades - if i % 2 == 0: - data = data_generator.generate_orderbook(symbol, exchange) - processed = processor.normalize_orderbook(data.__dict__, exchange) - else: - data = data_generator.generate_trade(symbol, exchange) - processed = processor.normalize_trade(data.__dict__, exchange) - - end_time = time.time() - latency_ms = (end_time - start_time) * 1000 - - monitor.record_latency(latency_ms) - - # Small delay to simulate realistic timing - await asyncio.sleep(0.01) - - except Exception as e: - monitor.record_error(e) - - @pytest.mark.asyncio - async def test_memory_usage_under_load(self, data_generator): - """Test memory usage patterns under sustained load""" - processor = DataProcessor() - process = psutil.Process() - - # Get baseline memory - gc.collect() # Force garbage collection - baseline_memory = process.memory_info().rss / 1024 / 1024 # MB - - memory_samples = [] - - # Generate sustained load - for i in range(1000): - symbol = LoadTestConfig.SYMBOLS[i % len(LoadTestConfig.SYMBOLS)] - exchange = LoadTestConfig.EXCHANGES[i % len(LoadTestConfig.EXCHANGES)] - - # Generate data - orderbook = data_generator.generate_orderbook(symbol, exchange) - trade = data_generator.generate_trade(symbol, exchange) - - # Process data - processor.normalize_orderbook(orderbook.__dict__, exchange) - processor.normalize_trade(trade.__dict__, exchange) - - # Sample memory every 100 operations - if i % 100 == 0: - current_memory = process.memory_info().rss / 1024 / 1024 - memory_samples.append(current_memory) - - # Force garbage collection periodically - if i % 500 == 0: - gc.collect() - - # Final memory check - gc.collect() - final_memory = process.memory_info().rss / 1024 / 1024 - - # Calculate memory statistics - max_memory = max(memory_samples) if memory_samples else final_memory - memory_growth = final_memory - baseline_memory - - # Verify memory usage is reasonable - assert memory_growth < 100, \ - f"Memory growth too high: {memory_growth:.2f}MB" - - assert max_memory < baseline_memory + 200, \ - f"Peak memory usage too high: {max_memory:.2f}MB" - - logger.info(f"Memory usage: baseline={baseline_memory:.2f}MB, " - f"final={final_memory:.2f}MB, growth={memory_growth:.2f}MB, " - f"peak={max_memory:.2f}MB") - - @pytest.mark.asyncio - async def test_stress_test_extreme_load(self, data_generator, performance_monitor): - """Stress test with extreme load conditions""" - processor = DataProcessor() - monitor = performance_monitor - - # Extreme load parameters - EXTREME_TPS = 5000 - STRESS_DURATION = 30 # seconds - - monitor.start() - - # Generate extreme load - tasks = [] - operations_per_batch = 100 - batches = EXTREME_TPS // operations_per_batch - - for batch in range(batches): - batch_tasks = [] - for i in range(operations_per_batch): - symbol = LoadTestConfig.SYMBOLS[i % len(LoadTestConfig.SYMBOLS)] - exchange = LoadTestConfig.EXCHANGES[i % len(LoadTestConfig.EXCHANGES)] - - task = asyncio.create_task( - self._process_orderbook_with_timing( - processor, data_generator, symbol, exchange, monitor - ) - ) - batch_tasks.append(task) - - # Process batch - await asyncio.gather(*batch_tasks, return_exceptions=True) - - # Small delay between batches - await asyncio.sleep(0.1) - - monitor.stop() - results = monitor.get_results() - - # Under extreme load, we accept lower performance but system should remain stable - assert results['success_rate'] >= 80.0, \ - f"System failed under stress: {results['success_rate']:.2f}% success rate" - - assert results['latency']['p99_ms'] <= 500, \ - f"P99 latency too high under stress: {results['latency']['p99_ms']:.2f}ms" - - logger.info(f"Stress test results: {results}") - - -@pytest.mark.benchmark -class TestPerformanceBenchmarks: - """Performance benchmarks for regression testing""" - - def test_orderbook_processing_benchmark(self, benchmark): - """Benchmark order book processing speed""" - processor = DataProcessor() - generator = DataGenerator() - - def process_orderbook(): - orderbook = generator.generate_orderbook("BTCUSDT", "binance") - return processor.normalize_orderbook(orderbook.__dict__, "binance") - - result = benchmark(process_orderbook) - assert result is not None - - def test_trade_processing_benchmark(self, benchmark): - """Benchmark trade processing speed""" - processor = DataProcessor() - generator = DataGenerator() - - def process_trade(): - trade = generator.generate_trade("BTCUSDT", "binance") - return processor.normalize_trade(trade.__dict__, "binance") - - result = benchmark(process_trade) - assert result is not None - - def test_aggregation_benchmark(self, benchmark): - """Benchmark aggregation engine performance""" - aggregator = AggregationEngine() - generator = DataGenerator() - - def aggregate_data(): - orderbook = generator.generate_orderbook("BTCUSDT", "binance") - buckets = aggregator.create_price_buckets(orderbook) - return aggregator.generate_heatmap(buckets) - - result = benchmark(aggregate_data) - assert result is not None - - -def pytest_configure(config): - """Configure pytest with custom markers""" - config.addinivalue_line("markers", "load: mark test as load test") - config.addinivalue_line("markers", "benchmark: mark test as benchmark") - - -def pytest_addoption(parser): - """Add custom command line options""" - parser.addoption( - "--load", - action="store_true", - default=False, - help="run load tests" - ) - parser.addoption( - "--benchmark", - action="store_true", - default=False, - help="run benchmark tests" - ) \ No newline at end of file