From de9fa4a421d20abe9ebf40620718419dffd7d533 Mon Sep 17 00:00:00 2001 From: Dobromir Popov Date: Mon, 4 Aug 2025 15:50:54 +0300 Subject: [PATCH] COBY : specs + task 1 --- .../multi-exchange-data-aggregation/design.md | 448 ++++++++++++++++++ .../requirements.md | 103 ++++ .../multi-exchange-data-aggregation/tasks.md | 160 +++++++ COBY/README.md | 231 +++++++++ COBY/__init__.py | 9 + COBY/config.py | 176 +++++++ COBY/docker/README.md | 273 +++++++++++ COBY/docker/backup.sh | 108 +++++ COBY/docker/deploy.sh | 112 +++++ .../init-scripts/01-init-timescaledb.sql | 214 +++++++++ COBY/docker/manual-init.sh | 37 ++ COBY/docker/redis.conf | 131 +++++ COBY/docker/restore.sh | 188 ++++++++ COBY/docker/timescaledb-compose.yml | 78 +++ COBY/interfaces/__init__.py | 17 + COBY/interfaces/aggregation_engine.py | 139 ++++++ COBY/interfaces/data_processor.py | 119 +++++ COBY/interfaces/exchange_connector.py | 189 ++++++++ COBY/interfaces/replay_manager.py | 212 +++++++++ COBY/interfaces/storage_manager.py | 215 +++++++++ COBY/models/__init__.py | 31 ++ COBY/models/core.py | 324 +++++++++++++ COBY/utils/__init__.py | 22 + COBY/utils/exceptions.py | 57 +++ COBY/utils/logging.py | 149 ++++++ COBY/utils/timing.py | 206 ++++++++ COBY/utils/validation.py | 217 +++++++++ web/clean_dashboard.py | 1 - 28 files changed, 4165 insertions(+), 1 deletion(-) create mode 100644 .kiro/specs/multi-exchange-data-aggregation/design.md create mode 100644 .kiro/specs/multi-exchange-data-aggregation/requirements.md create mode 100644 .kiro/specs/multi-exchange-data-aggregation/tasks.md create mode 100644 COBY/README.md create mode 100644 COBY/__init__.py create mode 100644 COBY/config.py create mode 100644 COBY/docker/README.md create mode 100644 COBY/docker/backup.sh create mode 100644 COBY/docker/deploy.sh create mode 100644 COBY/docker/init-scripts/01-init-timescaledb.sql create mode 100644 COBY/docker/manual-init.sh create mode 100644 COBY/docker/redis.conf create mode 100644 COBY/docker/restore.sh create mode 100644 COBY/docker/timescaledb-compose.yml create mode 100644 COBY/interfaces/__init__.py create mode 100644 COBY/interfaces/aggregation_engine.py create mode 100644 COBY/interfaces/data_processor.py create mode 100644 COBY/interfaces/exchange_connector.py create mode 100644 COBY/interfaces/replay_manager.py create mode 100644 COBY/interfaces/storage_manager.py create mode 100644 COBY/models/__init__.py create mode 100644 COBY/models/core.py create mode 100644 COBY/utils/__init__.py create mode 100644 COBY/utils/exceptions.py create mode 100644 COBY/utils/logging.py create mode 100644 COBY/utils/timing.py create mode 100644 COBY/utils/validation.py diff --git a/.kiro/specs/multi-exchange-data-aggregation/design.md b/.kiro/specs/multi-exchange-data-aggregation/design.md new file mode 100644 index 0000000..35f7a0a --- /dev/null +++ b/.kiro/specs/multi-exchange-data-aggregation/design.md @@ -0,0 +1,448 @@ +# Design Document + +## Overview + +The Multi-Exchange Data Aggregation System is a comprehensive data collection and processing subsystem designed to serve as the foundational data layer for the trading orchestrator. The system will collect real-time order book and OHLCV data from the top 10 cryptocurrency exchanges, aggregate it into standardized formats, store it in a TimescaleDB time-series database, and provide both live data feeds and historical replay capabilities. + +The system follows a microservices architecture with containerized components, ensuring scalability, maintainability, and seamless integration with the existing trading infrastructure. + +We implement it in the `.\COBY` subfolder for easy integration with the existing system + +## Architecture + +### High-Level Architecture + +```mermaid +graph TB + subgraph "Exchange Connectors" + E1[Binance WebSocket] + E2[Coinbase WebSocket] + E3[Kraken WebSocket] + E4[Bybit WebSocket] + E5[OKX WebSocket] + E6[Huobi WebSocket] + E7[KuCoin WebSocket] + E8[Gate.io WebSocket] + E9[Bitfinex WebSocket] + E10[MEXC WebSocket] + end + + subgraph "Data Processing Layer" + DP[Data Processor] + AGG[Aggregation Engine] + NORM[Data Normalizer] + end + + subgraph "Storage Layer" + TSDB[(TimescaleDB)] + CACHE[Redis Cache] + end + + subgraph "API Layer" + LIVE[Live Data API] + REPLAY[Replay API] + WEB[Web Dashboard] + end + + subgraph "Integration Layer" + ORCH[Orchestrator Interface] + ADAPTER[Data Adapter] + end + + E1 --> DP + E2 --> DP + E3 --> DP + E4 --> DP + E5 --> DP + E6 --> DP + E7 --> DP + E8 --> DP + E9 --> DP + E10 --> DP + + DP --> NORM + NORM --> AGG + AGG --> TSDB + AGG --> CACHE + + CACHE --> LIVE + TSDB --> REPLAY + LIVE --> WEB + REPLAY --> WEB + + LIVE --> ADAPTER + REPLAY --> ADAPTER + ADAPTER --> ORCH +``` + +### Component Architecture + +The system is organized into several key components: + +1. **Exchange Connectors**: WebSocket clients for each exchange +2. **Data Processing Engine**: Normalizes and validates incoming data +3. **Aggregation Engine**: Creates price buckets and heatmaps +4. **Storage Layer**: TimescaleDB for persistence, Redis for caching +5. **API Layer**: REST and WebSocket APIs for data access +6. **Web Dashboard**: Real-time visualization interface +7. **Integration Layer**: Orchestrator-compatible interface + +## Components and Interfaces + +### Exchange Connector Interface + +```python +class ExchangeConnector: + """Base interface for exchange WebSocket connectors""" + + async def connect(self) -> bool + async def disconnect(self) -> None + async def subscribe_orderbook(self, symbol: str) -> None + async def subscribe_trades(self, symbol: str) -> None + def get_connection_status(self) -> ConnectionStatus + def add_data_callback(self, callback: Callable) -> None +``` + +### Data Processing Interface + +```python +class DataProcessor: + """Processes and normalizes raw exchange data""" + + def normalize_orderbook(self, raw_data: Dict, exchange: str) -> OrderBookSnapshot + def normalize_trade(self, raw_data: Dict, exchange: str) -> TradeEvent + def validate_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> bool + def calculate_metrics(self, orderbook: OrderBookSnapshot) -> OrderBookMetrics +``` + +### Aggregation Engine Interface + +```python +class AggregationEngine: + """Aggregates data into price buckets and heatmaps""" + + def create_price_buckets(self, orderbook: OrderBookSnapshot, bucket_size: float) -> PriceBuckets + def update_heatmap(self, symbol: str, buckets: PriceBuckets) -> HeatmapData + def calculate_imbalances(self, orderbook: OrderBookSnapshot) -> ImbalanceMetrics + def aggregate_across_exchanges(self, symbol: str) -> ConsolidatedOrderBook +``` + +### Storage Interface + +```python +class StorageManager: + """Manages data persistence and retrieval""" + + async def store_orderbook(self, data: OrderBookSnapshot) -> bool + async def store_trade(self, data: TradeEvent) -> bool + async def get_historical_data(self, symbol: str, start: datetime, end: datetime) -> List[Dict] + async def get_latest_data(self, symbol: str) -> Dict + def setup_database_schema(self) -> None +``` + +### Replay Interface + +```python +class ReplayManager: + """Provides historical data replay functionality""" + + def create_replay_session(self, start_time: datetime, end_time: datetime, speed: float) -> str + async def start_replay(self, session_id: str) -> None + async def pause_replay(self, session_id: str) -> None + async def stop_replay(self, session_id: str) -> None + def get_replay_status(self, session_id: str) -> ReplayStatus +``` + +## Data Models + +### Core Data Structures + +```python +@dataclass +class OrderBookSnapshot: + """Standardized order book snapshot""" + symbol: str + exchange: str + timestamp: datetime + bids: List[PriceLevel] + asks: List[PriceLevel] + sequence_id: Optional[int] = None + +@dataclass +class PriceLevel: + """Individual price level in order book""" + price: float + size: float + count: Optional[int] = None + +@dataclass +class TradeEvent: + """Standardized trade event""" + symbol: str + exchange: str + timestamp: datetime + price: float + size: float + side: str # 'buy' or 'sell' + trade_id: str + +@dataclass +class PriceBuckets: + """Aggregated price buckets for heatmap""" + symbol: str + timestamp: datetime + bucket_size: float + bid_buckets: Dict[float, float] # price -> volume + ask_buckets: Dict[float, float] # price -> volume + +@dataclass +class HeatmapData: + """Heatmap visualization data""" + symbol: str + timestamp: datetime + bucket_size: float + data: List[HeatmapPoint] + +@dataclass +class HeatmapPoint: + """Individual heatmap data point""" + price: float + volume: float + intensity: float # 0.0 to 1.0 + side: str # 'bid' or 'ask' +``` + +### Database Schema + +#### TimescaleDB Tables + +```sql +-- Order book snapshots table +CREATE TABLE order_book_snapshots ( + id BIGSERIAL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + bids JSONB NOT NULL, + asks JSONB NOT NULL, + sequence_id BIGINT, + mid_price DECIMAL(20,8), + spread DECIMAL(20,8), + bid_volume DECIMAL(30,8), + ask_volume DECIMAL(30,8), + PRIMARY KEY (timestamp, symbol, exchange) +); + +-- Convert to hypertable +SELECT create_hypertable('order_book_snapshots', 'timestamp'); + +-- Trade events table +CREATE TABLE trade_events ( + id BIGSERIAL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + price DECIMAL(20,8) NOT NULL, + size DECIMAL(30,8) NOT NULL, + side VARCHAR(4) NOT NULL, + trade_id VARCHAR(100) NOT NULL, + PRIMARY KEY (timestamp, symbol, exchange, trade_id) +); + +-- Convert to hypertable +SELECT create_hypertable('trade_events', 'timestamp'); + +-- Aggregated heatmap data table +CREATE TABLE heatmap_data ( + symbol VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + bucket_size DECIMAL(10,2) NOT NULL, + price_bucket DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + side VARCHAR(3) NOT NULL, + exchange_count INTEGER NOT NULL, + PRIMARY KEY (timestamp, symbol, bucket_size, price_bucket, side) +); + +-- Convert to hypertable +SELECT create_hypertable('heatmap_data', 'timestamp'); + +-- OHLCV data table +CREATE TABLE ohlcv_data ( + symbol VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + timeframe VARCHAR(10) NOT NULL, + open_price DECIMAL(20,8) NOT NULL, + high_price DECIMAL(20,8) NOT NULL, + low_price DECIMAL(20,8) NOT NULL, + close_price DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + trade_count INTEGER, + PRIMARY KEY (timestamp, symbol, timeframe) +); + +-- Convert to hypertable +SELECT create_hypertable('ohlcv_data', 'timestamp'); +``` + +## Error Handling + +### Connection Management + +The system implements robust error handling for exchange connections: + +1. **Exponential Backoff**: Failed connections retry with increasing delays +2. **Circuit Breaker**: Temporarily disable problematic exchanges +3. **Graceful Degradation**: Continue operation with available exchanges +4. **Health Monitoring**: Continuous monitoring of connection status + +### Data Validation + +All incoming data undergoes validation: + +1. **Schema Validation**: Ensure data structure compliance +2. **Range Validation**: Check price and volume ranges +3. **Timestamp Validation**: Verify temporal consistency +4. **Duplicate Detection**: Prevent duplicate data storage + +### Database Resilience + +Database operations include comprehensive error handling: + +1. **Connection Pooling**: Maintain multiple database connections +2. **Transaction Management**: Ensure data consistency +3. **Retry Logic**: Automatic retry for transient failures +4. **Backup Strategies**: Regular data backups and recovery procedures + +## Testing Strategy + +### Unit Testing + +Each component will have comprehensive unit tests: + +1. **Exchange Connectors**: Mock WebSocket responses +2. **Data Processing**: Test normalization and validation +3. **Aggregation Engine**: Verify bucket calculations +4. **Storage Layer**: Test database operations +5. **API Layer**: Test endpoint responses + +### Integration Testing + +End-to-end testing scenarios: + +1. **Multi-Exchange Data Flow**: Test complete data pipeline +2. **Database Integration**: Verify TimescaleDB operations +3. **API Integration**: Test orchestrator interface compatibility +4. **Performance Testing**: Load testing with high-frequency data + +### Performance Testing + +Performance benchmarks and testing: + +1. **Throughput Testing**: Measure data processing capacity +2. **Latency Testing**: Measure end-to-end data latency +3. **Memory Usage**: Monitor memory consumption patterns +4. **Database Performance**: Query performance optimization + +### Monitoring and Observability + +Comprehensive monitoring system: + +1. **Metrics Collection**: Prometheus-compatible metrics +2. **Logging**: Structured logging with correlation IDs +3. **Alerting**: Real-time alerts for system issues +4. **Dashboards**: Grafana dashboards for system monitoring + +## Deployment Architecture + +### Docker Containerization + +The system will be deployed using Docker containers: + +```yaml +# docker-compose.yml +version: '3.8' +services: + timescaledb: + image: timescale/timescaledb:latest-pg14 + environment: + POSTGRES_DB: market_data + POSTGRES_USER: market_user + POSTGRES_PASSWORD: ${DB_PASSWORD} + volumes: + - timescale_data:/var/lib/postgresql/data + ports: + - "5432:5432" + + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + + data-aggregator: + build: ./data-aggregator + environment: + - DB_HOST=timescaledb + - REDIS_HOST=redis + - LOG_LEVEL=INFO + depends_on: + - timescaledb + - redis + + web-dashboard: + build: ./web-dashboard + ports: + - "8080:8080" + environment: + - API_HOST=data-aggregator + depends_on: + - data-aggregator + +volumes: + timescale_data: + redis_data: +``` + +### Configuration Management + +Environment-based configuration: + +```python +# config.py +@dataclass +class Config: + # Database settings + db_host: str = os.getenv('DB_HOST', 'localhost') + db_port: int = int(os.getenv('DB_PORT', '5432')) + db_name: str = os.getenv('DB_NAME', 'market_data') + db_user: str = os.getenv('DB_USER', 'market_user') + db_password: str = os.getenv('DB_PASSWORD', '') + + # Redis settings + redis_host: str = os.getenv('REDIS_HOST', 'localhost') + redis_port: int = int(os.getenv('REDIS_PORT', '6379')) + + # Exchange settings + exchanges: List[str] = field(default_factory=lambda: [ + 'binance', 'coinbase', 'kraken', 'bybit', 'okx', + 'huobi', 'kucoin', 'gateio', 'bitfinex', 'mexc' + ]) + + # Aggregation settings + btc_bucket_size: float = 10.0 # $10 USD buckets for BTC + eth_bucket_size: float = 1.0 # $1 USD buckets for ETH + + # Performance settings + max_connections_per_exchange: int = 5 + data_buffer_size: int = 10000 + batch_write_size: int = 1000 + + # API settings + api_host: str = os.getenv('API_HOST', '0.0.0.0') + api_port: int = int(os.getenv('API_PORT', '8080')) + websocket_port: int = int(os.getenv('WS_PORT', '8081')) +``` + +This design provides a robust, scalable foundation for multi-exchange data aggregation that seamlessly integrates with the existing trading orchestrator while providing the flexibility for future enhancements and additional exchange integrations. \ No newline at end of file diff --git a/.kiro/specs/multi-exchange-data-aggregation/requirements.md b/.kiro/specs/multi-exchange-data-aggregation/requirements.md new file mode 100644 index 0000000..1c86581 --- /dev/null +++ b/.kiro/specs/multi-exchange-data-aggregation/requirements.md @@ -0,0 +1,103 @@ +# Requirements Document + +## Introduction + +This document outlines the requirements for a comprehensive data collection and aggregation subsystem that will serve as a foundational component for the trading orchestrator. The system will collect, aggregate, and store real-time order book and OHLCV data from multiple cryptocurrency exchanges, providing both live data feeds and historical replay capabilities for model training and backtesting. + +## Requirements + +### Requirement 1 + +**User Story:** As a trading system developer, I want to collect real-time order book data from top 10 cryptocurrency exchanges, so that I can have comprehensive market data for analysis and trading decisions. + +#### Acceptance Criteria + +1. WHEN the system starts THEN it SHALL establish WebSocket connections to up to 10 major cryptocurrency exchanges +2. WHEN order book updates are received THEN the system SHALL process and store raw order book events in real-time +3. WHEN processing order book data THEN the system SHALL handle connection failures gracefully and automatically reconnect +4. WHEN multiple exchanges provide data THEN the system SHALL normalize data formats to a consistent structure +5. IF an exchange connection fails THEN the system SHALL log the failure and attempt reconnection with exponential backoff + +### Requirement 2 + +**User Story:** As a trading analyst, I want order book data aggregated into price buckets with heatmap visualization, so that I can quickly identify market depth and liquidity patterns. + +#### Acceptance Criteria + +1. WHEN processing BTC order book data THEN the system SHALL aggregate orders into $10 USD price range buckets +2. WHEN processing ETH order book data THEN the system SHALL aggregate orders into $1 USD price range buckets +3. WHEN aggregating order data THEN the system SHALL maintain separate bid and ask heatmaps +4. WHEN building heatmaps THEN the system SHALL update distribution data at high frequency (sub-second) +5. WHEN displaying heatmaps THEN the system SHALL show volume intensity using color gradients or progress bars + +### Requirement 3 + +**User Story:** As a system architect, I want all market data stored in a TimescaleDB database, so that I can efficiently query time-series data and maintain historical records. + +#### Acceptance Criteria + +1. WHEN the system initializes THEN it SHALL connect to a TimescaleDB instance running in a Docker container +2. WHEN storing order book events THEN the system SHALL use TimescaleDB's time-series optimized storage +3. WHEN storing OHLCV data THEN the system SHALL create appropriate time-series tables with proper indexing +4. WHEN writing to database THEN the system SHALL batch writes for optimal performance +5. IF database connection fails THEN the system SHALL queue data in memory and retry with backoff strategy + +### Requirement 4 + +**User Story:** As a trading system operator, I want a web-based dashboard to monitor real-time order book heatmaps, so that I can visualize market conditions across multiple exchanges. + +#### Acceptance Criteria + +1. WHEN accessing the web dashboard THEN it SHALL display real-time order book heatmaps for BTC and ETH +2. WHEN viewing heatmaps THEN the dashboard SHALL show aggregated data from all connected exchanges +3. WHEN displaying progress bars THEN they SHALL always show aggregated values across price buckets +4. WHEN updating the display THEN the dashboard SHALL refresh data at least once per second +5. WHEN an exchange goes offline THEN the dashboard SHALL indicate the status change visually + +### Requirement 5 + +**User Story:** As a model trainer, I want a replay interface that can provide historical data in the same format as live data, so that I can train models on past market events. + +#### Acceptance Criteria + +1. WHEN requesting historical data THEN the replay interface SHALL provide data in the same structure as live feeds +2. WHEN replaying data THEN the system SHALL maintain original timing relationships between events +3. WHEN using replay mode THEN the interface SHALL support configurable playback speeds +4. WHEN switching between live and replay modes THEN the orchestrator SHALL receive data through the same interface +5. IF replay data is requested for unavailable time periods THEN the system SHALL return appropriate error messages + +### Requirement 6 + +**User Story:** As a trading system integrator, I want the data aggregation system to follow the same interface as the current orchestrator data provider, so that I can seamlessly integrate it into existing workflows. + +#### Acceptance Criteria + +1. WHEN the orchestrator requests data THEN the aggregation system SHALL provide data in the expected format +2. WHEN integrating with existing systems THEN the interface SHALL be compatible with current data provider contracts +3. WHEN providing aggregated data THEN the system SHALL include metadata about data sources and quality +4. WHEN the orchestrator switches data sources THEN it SHALL work without code changes +5. IF data quality issues are detected THEN the system SHALL provide quality indicators in the response + +### Requirement 7 + +**User Story:** As a system administrator, I want the data collection system to be containerized and easily deployable, so that I can manage it alongside other system components. + +#### Acceptance Criteria + +1. WHEN deploying the system THEN it SHALL run in Docker containers with proper resource allocation +2. WHEN starting services THEN TimescaleDB SHALL be automatically provisioned in its own container +3. WHEN configuring the system THEN all settings SHALL be externalized through environment variables or config files +4. WHEN monitoring the system THEN it SHALL provide health check endpoints for container orchestration +5. IF containers need to be restarted THEN the system SHALL recover gracefully without data loss + +### Requirement 8 + +**User Story:** As a performance engineer, I want the system to handle high-frequency data efficiently, so that it can process order book updates from multiple exchanges without latency issues. + +#### Acceptance Criteria + +1. WHEN processing order book updates THEN the system SHALL handle at least 10 updates per second per exchange +2. WHEN aggregating data THEN processing latency SHALL be less than 10 milliseconds per update +3. WHEN storing data THEN the system SHALL use efficient batching to minimize database overhead +4. WHEN memory usage grows THEN the system SHALL implement appropriate cleanup and garbage collection +5. IF processing falls behind THEN the system SHALL prioritize recent data and log performance warnings \ No newline at end of file diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md new file mode 100644 index 0000000..c2cd82c --- /dev/null +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -0,0 +1,160 @@ +# Implementation Plan + +- [x] 1. Set up project structure and core interfaces + + + + - Create directory structure in `.\COBY` subfolder for the multi-exchange data aggregation system + - Define base interfaces and data models for exchange connectors, data processing, and storage + - Implement configuration management system with environment variable support + - _Requirements: 1.1, 6.1, 7.3_ + +- [ ] 2. Implement TimescaleDB integration and database schema + - Create TimescaleDB connection manager with connection pooling + - Implement database schema creation with hypertables for time-series optimization + - Write database operations for storing order book snapshots and trade events + - Create database migration system for schema updates + - _Requirements: 3.1, 3.2, 3.3, 3.4_ + +- [ ] 3. Create base exchange connector framework + - Implement abstract base class for exchange WebSocket connectors + - Create connection management with exponential backoff and circuit breaker patterns + - Implement WebSocket message handling with proper error recovery + - Add connection status monitoring and health checks + - _Requirements: 1.1, 1.3, 1.4, 8.5_ + +- [ ] 4. Implement Binance exchange connector + - Create Binance-specific WebSocket connector extending the base framework + - Implement order book depth stream subscription and processing + - Add trade stream subscription for volume analysis + - Implement data normalization from Binance format to standard format + - Write unit tests for Binance connector functionality + - _Requirements: 1.1, 1.2, 1.4, 6.2_ + +- [ ] 5. Create data processing and normalization engine + - Implement data processor for normalizing raw exchange data + - Create validation logic for order book and trade data + - Implement data quality checks and filtering + - Add metrics calculation for order book statistics + - Write comprehensive unit tests for data processing logic + - _Requirements: 1.4, 6.3, 8.1_ + +- [ ] 6. Implement price bucket aggregation system + - Create aggregation engine for converting order book data to price buckets + - Implement configurable bucket sizes ($10 for BTC, $1 for ETH) + - Create heatmap data structure generation from price buckets + - Implement real-time aggregation with high-frequency updates + - Add volume-weighted aggregation calculations + - _Requirements: 2.1, 2.2, 2.3, 2.4, 8.1, 8.2_ + +- [ ] 7. Build Redis caching layer + - Implement Redis connection manager with connection pooling + - Create caching strategies for latest order book data and heatmaps + - Implement cache invalidation and TTL management + - Add cache performance monitoring and metrics + - Write tests for caching functionality + - _Requirements: 8.2, 8.3_ + +- [ ] 8. Create live data API endpoints + - Implement REST API for accessing current order book data + - Create WebSocket API for real-time data streaming + - Add endpoints for heatmap data retrieval + - Implement API rate limiting and authentication + - Create comprehensive API documentation + - _Requirements: 4.1, 4.2, 4.4, 6.3_ + +- [ ] 9. Implement web dashboard for visualization + - Create HTML/CSS/JavaScript dashboard for real-time heatmap visualization + - Implement WebSocket client for receiving real-time updates + - Create progress bar visualization for aggregated price buckets + - Add exchange status indicators and connection monitoring + - Implement responsive design for different screen sizes + - _Requirements: 4.1, 4.2, 4.3, 4.5_ + +- [ ] 10. Build historical data replay system + - Create replay manager for historical data playback + - Implement configurable playback speeds and time range selection + - Create replay session management with start/pause/stop controls + - Implement data streaming interface compatible with live data format + - Add replay status monitoring and progress tracking + - _Requirements: 5.1, 5.2, 5.3, 5.4, 5.5_ + +- [ ] 11. Create orchestrator integration interface + - Implement data adapter that matches existing orchestrator interface + - Create compatibility layer for seamless integration with current data provider + - Add data quality indicators and metadata in responses + - Implement switching mechanism between live and replay modes + - Write integration tests with existing orchestrator code + - _Requirements: 6.1, 6.2, 6.3, 6.4, 6.5_ + +- [ ] 12. Add additional exchange connectors (Coinbase, Kraken) + - Implement Coinbase Pro WebSocket connector with proper authentication + - Create Kraken WebSocket connector with their specific message format + - Add exchange-specific data normalization for both exchanges + - Implement proper error handling for each exchange's quirks + - Write unit tests for both new exchange connectors + - _Requirements: 1.1, 1.2, 1.4_ + +- [ ] 13. Implement remaining exchange connectors (Bybit, OKX, Huobi) + - Create Bybit WebSocket connector with unified trading account support + - Implement OKX connector with their V5 API WebSocket streams + - Add Huobi Global connector with proper symbol mapping + - Ensure all connectors follow the same interface and error handling patterns + - Write comprehensive tests for all three exchange connectors + - _Requirements: 1.1, 1.2, 1.4_ + +- [ ] 14. Complete exchange connector suite (KuCoin, Gate.io, Bitfinex, MEXC) + - Implement KuCoin connector with proper token-based authentication + - Create Gate.io connector with their WebSocket v4 API + - Add Bitfinex connector with proper channel subscription management + - Implement MEXC connector with their WebSocket streams + - Ensure all 10 exchanges are properly integrated and tested + - _Requirements: 1.1, 1.2, 1.4_ + +- [ ] 15. Implement cross-exchange data consolidation + - Create consolidation engine that merges order book data from multiple exchanges + - Implement weighted aggregation based on exchange liquidity and reliability + - Add conflict resolution for price discrepancies between exchanges + - Create consolidated heatmap that shows combined market depth + - Write tests for multi-exchange aggregation scenarios + - _Requirements: 2.5, 4.2_ + +- [ ] 16. Add performance monitoring and optimization + - Implement comprehensive metrics collection for all system components + - Create performance monitoring dashboard with key system metrics + - Add latency tracking for end-to-end data processing + - Implement memory usage monitoring and garbage collection optimization + - Create alerting system for performance degradation + - _Requirements: 8.1, 8.2, 8.3, 8.4, 8.5_ + +- [ ] 17. Create Docker containerization and deployment + - Write Dockerfiles for all system components + - Create docker-compose configuration for local development + - Implement health check endpoints for container orchestration + - Add environment variable configuration for all services + - Create deployment scripts and documentation + - _Requirements: 7.1, 7.2, 7.3, 7.4, 7.5_ + +- [ ] 18. Implement comprehensive testing suite + - Create integration tests for complete data pipeline from exchanges to storage + - Implement load testing for high-frequency data scenarios + - Add end-to-end tests for web dashboard functionality + - Create performance benchmarks and regression tests + - Write documentation for running and maintaining tests + - _Requirements: 8.1, 8.2, 8.3, 8.4_ + +- [ ] 19. Add system monitoring and alerting + - Implement structured logging with correlation IDs across all components + - Create Prometheus metrics exporters for system monitoring + - Add Grafana dashboards for system visualization + - Implement alerting rules for system failures and performance issues + - Create runbook documentation for common operational scenarios + - _Requirements: 7.4, 8.5_ + +- [ ] 20. Final integration and system testing + - Integrate the complete system with existing trading orchestrator + - Perform end-to-end testing with real market data + - Validate replay functionality with historical data scenarios + - Test failover scenarios and system resilience + - Create user documentation and operational guides + - _Requirements: 6.1, 6.2, 6.4, 5.1, 5.2_ \ No newline at end of file diff --git a/COBY/README.md b/COBY/README.md new file mode 100644 index 0000000..23d0653 --- /dev/null +++ b/COBY/README.md @@ -0,0 +1,231 @@ +# COBY - Multi-Exchange Data Aggregation System + +COBY (Cryptocurrency Order Book Yielder) is a comprehensive data collection and aggregation subsystem designed to serve as the foundational data layer for trading systems. It collects real-time order book and OHLCV data from multiple cryptocurrency exchanges, aggregates it into standardized formats, and provides both live data feeds and historical replay capabilities. + +## ๐Ÿ—๏ธ Architecture + +The system follows a modular architecture with clear separation of concerns: + +``` +COBY/ +โ”œโ”€โ”€ config.py # Configuration management +โ”œโ”€โ”€ models/ # Data models and structures +โ”‚ โ”œโ”€โ”€ __init__.py +โ”‚ โ””โ”€โ”€ core.py # Core data models +โ”œโ”€โ”€ interfaces/ # Abstract interfaces +โ”‚ โ”œโ”€โ”€ __init__.py +โ”‚ โ”œโ”€โ”€ exchange_connector.py +โ”‚ โ”œโ”€โ”€ data_processor.py +โ”‚ โ”œโ”€โ”€ aggregation_engine.py +โ”‚ โ”œโ”€โ”€ storage_manager.py +โ”‚ โ””โ”€โ”€ replay_manager.py +โ”œโ”€โ”€ utils/ # Utility functions +โ”‚ โ”œโ”€โ”€ __init__.py +โ”‚ โ”œโ”€โ”€ exceptions.py +โ”‚ โ”œโ”€โ”€ logging.py +โ”‚ โ”œโ”€โ”€ validation.py +โ”‚ โ””โ”€โ”€ timing.py +โ””โ”€โ”€ README.md +``` + +## ๐Ÿš€ Features + +- **Multi-Exchange Support**: Connect to 10+ major cryptocurrency exchanges +- **Real-Time Data**: High-frequency order book and trade data collection +- **Price Bucket Aggregation**: Configurable price buckets ($10 for BTC, $1 for ETH) +- **Heatmap Visualization**: Real-time market depth heatmaps +- **Historical Replay**: Replay past market events for model training +- **TimescaleDB Storage**: Optimized time-series data storage +- **Redis Caching**: High-performance data caching layer +- **Orchestrator Integration**: Compatible with existing trading systems + +## ๐Ÿ“Š Data Models + +### Core Models + +- **OrderBookSnapshot**: Standardized order book data +- **TradeEvent**: Individual trade events +- **PriceBuckets**: Aggregated price bucket data +- **HeatmapData**: Visualization-ready heatmap data +- **ConnectionStatus**: Exchange connection monitoring +- **ReplaySession**: Historical data replay management + +### Key Features + +- Automatic data validation and normalization +- Configurable price bucket sizes per symbol +- Real-time metrics calculation +- Cross-exchange data consolidation +- Quality scoring and anomaly detection + +## โš™๏ธ Configuration + +The system uses environment variables for configuration: + +```python +# Database settings +DB_HOST=192.168.0.10 +DB_PORT=5432 +DB_NAME=market_data +DB_USER=market_user +DB_PASSWORD=your_password + +# Redis settings +REDIS_HOST=192.168.0.10 +REDIS_PORT=6379 +REDIS_PASSWORD=your_password + +# Aggregation settings +BTC_BUCKET_SIZE=10.0 +ETH_BUCKET_SIZE=1.0 +HEATMAP_DEPTH=50 +UPDATE_FREQUENCY=0.5 + +# Performance settings +DATA_BUFFER_SIZE=10000 +BATCH_WRITE_SIZE=1000 +MAX_MEMORY_USAGE=2048 +``` + +## ๐Ÿ”Œ Interfaces + +### ExchangeConnector +Abstract base class for exchange WebSocket connectors with: +- Connection management with auto-reconnect +- Order book and trade subscriptions +- Data normalization callbacks +- Health monitoring + +### DataProcessor +Interface for data processing and validation: +- Raw data normalization +- Quality validation +- Metrics calculation +- Anomaly detection + +### AggregationEngine +Interface for data aggregation: +- Price bucket creation +- Heatmap generation +- Cross-exchange consolidation +- Imbalance calculations + +### StorageManager +Interface for data persistence: +- TimescaleDB operations +- Batch processing +- Historical data retrieval +- Storage optimization + +### ReplayManager +Interface for historical data replay: +- Session management +- Configurable playback speeds +- Time-based seeking +- Real-time compatibility + +## ๐Ÿ› ๏ธ Utilities + +### Logging +- Structured logging with correlation IDs +- Configurable log levels and outputs +- Rotating file handlers +- Context-aware logging + +### Validation +- Symbol format validation +- Price and volume validation +- Configuration validation +- Data quality checks + +### Timing +- UTC timestamp handling +- Performance measurement +- Time-based operations +- Interval calculations + +### Exceptions +- Custom exception hierarchy +- Error code management +- Detailed error context +- Structured error responses + +## ๐Ÿ”ง Usage + +### Basic Configuration + +```python +from COBY.config import config + +# Access configuration +db_url = config.get_database_url() +bucket_size = config.get_bucket_size('BTCUSDT') +``` + +### Data Models + +```python +from COBY.models import OrderBookSnapshot, PriceLevel + +# Create order book snapshot +orderbook = OrderBookSnapshot( + symbol='BTCUSDT', + exchange='binance', + timestamp=datetime.now(timezone.utc), + bids=[PriceLevel(50000.0, 1.5)], + asks=[PriceLevel(50100.0, 2.0)] +) + +# Access calculated properties +mid_price = orderbook.mid_price +spread = orderbook.spread +``` + +### Logging + +```python +from COBY.utils import setup_logging, get_logger, set_correlation_id + +# Setup logging +setup_logging(level='INFO', log_file='logs/coby.log') + +# Get logger +logger = get_logger(__name__) + +# Use correlation ID +set_correlation_id('req-123') +logger.info("Processing order book data") +``` + +## ๐Ÿƒ Next Steps + +This is the foundational structure for the COBY system. The next implementation tasks will build upon these interfaces and models to create: + +1. TimescaleDB integration +2. Exchange connector implementations +3. Data processing engines +4. Aggregation algorithms +5. Web dashboard +6. API endpoints +7. Replay functionality + +Each component will implement the defined interfaces, ensuring consistency and maintainability across the entire system. + +## ๐Ÿ“ Development Guidelines + +- All components must implement the defined interfaces +- Use the provided data models for consistency +- Follow the logging and error handling patterns +- Validate all input data using the utility functions +- Maintain backward compatibility with the orchestrator interface +- Write comprehensive tests for all functionality + +## ๐Ÿ” Monitoring + +The system provides comprehensive monitoring through: +- Structured logging with correlation IDs +- Performance metrics collection +- Health check endpoints +- Connection status monitoring +- Data quality indicators +- System resource tracking \ No newline at end of file diff --git a/COBY/__init__.py b/COBY/__init__.py new file mode 100644 index 0000000..9d15897 --- /dev/null +++ b/COBY/__init__.py @@ -0,0 +1,9 @@ +""" +Multi-Exchange Data Aggregation System (COBY) + +A comprehensive data collection and aggregation subsystem for cryptocurrency exchanges. +Provides real-time order book data, heatmap visualization, and historical replay capabilities. +""" + +__version__ = "1.0.0" +__author__ = "Trading System Team" \ No newline at end of file diff --git a/COBY/config.py b/COBY/config.py new file mode 100644 index 0000000..6febf1c --- /dev/null +++ b/COBY/config.py @@ -0,0 +1,176 @@ +""" +Configuration management for the multi-exchange data aggregation system. +""" + +import os +from dataclasses import dataclass, field +from typing import List, Dict, Any +from pathlib import Path + + +@dataclass +class DatabaseConfig: + """Database configuration settings""" + host: str = os.getenv('DB_HOST', '192.168.0.10') + port: int = int(os.getenv('DB_PORT', '5432')) + name: str = os.getenv('DB_NAME', 'market_data') + user: str = os.getenv('DB_USER', 'market_user') + password: str = os.getenv('DB_PASSWORD', 'market_data_secure_pass_2024') + schema: str = os.getenv('DB_SCHEMA', 'market_data') + pool_size: int = int(os.getenv('DB_POOL_SIZE', '10')) + max_overflow: int = int(os.getenv('DB_MAX_OVERFLOW', '20')) + pool_timeout: int = int(os.getenv('DB_POOL_TIMEOUT', '30')) + + +@dataclass +class RedisConfig: + """Redis configuration settings""" + host: str = os.getenv('REDIS_HOST', '192.168.0.10') + port: int = int(os.getenv('REDIS_PORT', '6379')) + password: str = os.getenv('REDIS_PASSWORD', 'market_data_redis_2024') + db: int = int(os.getenv('REDIS_DB', '0')) + max_connections: int = int(os.getenv('REDIS_MAX_CONNECTIONS', '50')) + socket_timeout: int = int(os.getenv('REDIS_SOCKET_TIMEOUT', '5')) + socket_connect_timeout: int = int(os.getenv('REDIS_CONNECT_TIMEOUT', '5')) + + +@dataclass +class ExchangeConfig: + """Exchange configuration settings""" + exchanges: List[str] = field(default_factory=lambda: [ + 'binance', 'coinbase', 'kraken', 'bybit', 'okx', + 'huobi', 'kucoin', 'gateio', 'bitfinex', 'mexc' + ]) + symbols: List[str] = field(default_factory=lambda: ['BTCUSDT', 'ETHUSDT']) + max_connections_per_exchange: int = int(os.getenv('MAX_CONNECTIONS_PER_EXCHANGE', '5')) + reconnect_delay: int = int(os.getenv('RECONNECT_DELAY', '5')) + max_reconnect_attempts: int = int(os.getenv('MAX_RECONNECT_ATTEMPTS', '10')) + heartbeat_interval: int = int(os.getenv('HEARTBEAT_INTERVAL', '30')) + + +@dataclass +class AggregationConfig: + """Data aggregation configuration""" + btc_bucket_size: float = float(os.getenv('BTC_BUCKET_SIZE', '10.0')) # $10 USD buckets + eth_bucket_size: float = float(os.getenv('ETH_BUCKET_SIZE', '1.0')) # $1 USD buckets + default_bucket_size: float = float(os.getenv('DEFAULT_BUCKET_SIZE', '1.0')) + heatmap_depth: int = int(os.getenv('HEATMAP_DEPTH', '50')) # Number of price levels + update_frequency: float = float(os.getenv('UPDATE_FREQUENCY', '0.5')) # Seconds + volume_threshold: float = float(os.getenv('VOLUME_THRESHOLD', '0.01')) # Minimum volume + + +@dataclass +class PerformanceConfig: + """Performance and optimization settings""" + data_buffer_size: int = int(os.getenv('DATA_BUFFER_SIZE', '10000')) + batch_write_size: int = int(os.getenv('BATCH_WRITE_SIZE', '1000')) + max_memory_usage: int = int(os.getenv('MAX_MEMORY_USAGE', '2048')) # MB + gc_threshold: float = float(os.getenv('GC_THRESHOLD', '0.8')) # 80% of max memory + processing_timeout: int = int(os.getenv('PROCESSING_TIMEOUT', '10')) # Seconds + max_queue_size: int = int(os.getenv('MAX_QUEUE_SIZE', '50000')) + + +@dataclass +class APIConfig: + """API server configuration""" + host: str = os.getenv('API_HOST', '0.0.0.0') + port: int = int(os.getenv('API_PORT', '8080')) + websocket_port: int = int(os.getenv('WS_PORT', '8081')) + cors_origins: List[str] = field(default_factory=lambda: ['*']) + rate_limit: int = int(os.getenv('RATE_LIMIT', '100')) # Requests per minute + max_connections: int = int(os.getenv('MAX_WS_CONNECTIONS', '1000')) + + +@dataclass +class LoggingConfig: + """Logging configuration""" + level: str = os.getenv('LOG_LEVEL', 'INFO') + format: str = os.getenv('LOG_FORMAT', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + file_path: str = os.getenv('LOG_FILE', 'logs/coby.log') + max_file_size: int = int(os.getenv('LOG_MAX_SIZE', '100')) # MB + backup_count: int = int(os.getenv('LOG_BACKUP_COUNT', '5')) + enable_correlation_id: bool = os.getenv('ENABLE_CORRELATION_ID', 'true').lower() == 'true' + + +@dataclass +class Config: + """Main configuration class""" + database: DatabaseConfig = field(default_factory=DatabaseConfig) + redis: RedisConfig = field(default_factory=RedisConfig) + exchanges: ExchangeConfig = field(default_factory=ExchangeConfig) + aggregation: AggregationConfig = field(default_factory=AggregationConfig) + performance: PerformanceConfig = field(default_factory=PerformanceConfig) + api: APIConfig = field(default_factory=APIConfig) + logging: LoggingConfig = field(default_factory=LoggingConfig) + + # Environment + environment: str = os.getenv('ENVIRONMENT', 'development') + debug: bool = os.getenv('DEBUG', 'false').lower() == 'true' + + def __post_init__(self): + """Post-initialization validation and setup""" + # Create logs directory if it doesn't exist + log_dir = Path(self.logging.file_path).parent + log_dir.mkdir(parents=True, exist_ok=True) + + # Validate bucket sizes + if self.aggregation.btc_bucket_size <= 0: + raise ValueError("BTC bucket size must be positive") + if self.aggregation.eth_bucket_size <= 0: + raise ValueError("ETH bucket size must be positive") + + def get_bucket_size(self, symbol: str) -> float: + """Get bucket size for a specific symbol""" + symbol_upper = symbol.upper() + if 'BTC' in symbol_upper: + return self.aggregation.btc_bucket_size + elif 'ETH' in symbol_upper: + return self.aggregation.eth_bucket_size + else: + return self.aggregation.default_bucket_size + + def get_database_url(self) -> str: + """Get database connection URL""" + return (f"postgresql://{self.database.user}:{self.database.password}" + f"@{self.database.host}:{self.database.port}/{self.database.name}") + + def get_redis_url(self) -> str: + """Get Redis connection URL""" + auth = f":{self.redis.password}@" if self.redis.password else "" + return f"redis://{auth}{self.redis.host}:{self.redis.port}/{self.redis.db}" + + def to_dict(self) -> Dict[str, Any]: + """Convert configuration to dictionary""" + return { + 'database': { + 'host': self.database.host, + 'port': self.database.port, + 'name': self.database.name, + 'schema': self.database.schema, + }, + 'redis': { + 'host': self.redis.host, + 'port': self.redis.port, + 'db': self.redis.db, + }, + 'exchanges': { + 'count': len(self.exchanges.exchanges), + 'symbols': self.exchanges.symbols, + }, + 'aggregation': { + 'btc_bucket_size': self.aggregation.btc_bucket_size, + 'eth_bucket_size': self.aggregation.eth_bucket_size, + 'heatmap_depth': self.aggregation.heatmap_depth, + }, + 'api': { + 'host': self.api.host, + 'port': self.api.port, + 'websocket_port': self.api.websocket_port, + }, + 'environment': self.environment, + 'debug': self.debug, + } + + +# Global configuration instance +config = Config() \ No newline at end of file diff --git a/COBY/docker/README.md b/COBY/docker/README.md new file mode 100644 index 0000000..97d1000 --- /dev/null +++ b/COBY/docker/README.md @@ -0,0 +1,273 @@ +# Market Data Infrastructure Docker Setup + +This directory contains Docker Compose configurations and scripts for deploying TimescaleDB and Redis infrastructure for the multi-exchange data aggregation system. + +## ๐Ÿ—๏ธ Architecture + +- **TimescaleDB**: Time-series database optimized for high-frequency market data +- **Redis**: High-performance caching layer for real-time data +- **Network**: Isolated Docker network for secure communication + +## ๐Ÿ“‹ Prerequisites + +- Docker Engine 20.10+ +- Docker Compose 2.0+ +- At least 4GB RAM available for containers +- 50GB+ disk space for data storage + +## ๐Ÿš€ Quick Start + +1. **Copy environment file**: + ```bash + cp .env.example .env + ``` + +2. **Edit configuration** (update passwords and settings): + ```bash + nano .env + ``` + +3. **Deploy infrastructure**: + ```bash + chmod +x deploy.sh + ./deploy.sh + ``` + +4. **Verify deployment**: + ```bash + docker-compose -f timescaledb-compose.yml ps + ``` + +## ๐Ÿ“ File Structure + +``` +docker/ +โ”œโ”€โ”€ timescaledb-compose.yml # Main Docker Compose configuration +โ”œโ”€โ”€ init-scripts/ # Database initialization scripts +โ”‚ โ””โ”€โ”€ 01-init-timescaledb.sql +โ”œโ”€โ”€ redis.conf # Redis configuration +โ”œโ”€โ”€ .env # Environment variables +โ”œโ”€โ”€ deploy.sh # Deployment script +โ”œโ”€โ”€ backup.sh # Backup script +โ”œโ”€โ”€ restore.sh # Restore script +โ””โ”€โ”€ README.md # This file +``` + +## โš™๏ธ Configuration + +### Environment Variables + +Key variables in `.env`: + +```bash +# Database credentials +POSTGRES_PASSWORD=your_secure_password +POSTGRES_USER=market_user +POSTGRES_DB=market_data + +# Redis settings +REDIS_PASSWORD=your_redis_password + +# Performance tuning +POSTGRES_SHARED_BUFFERS=256MB +POSTGRES_EFFECTIVE_CACHE_SIZE=1GB +REDIS_MAXMEMORY=2gb +``` + +### TimescaleDB Configuration + +The database is pre-configured with: +- Optimized PostgreSQL settings for time-series data +- TimescaleDB extension enabled +- Hypertables for automatic partitioning +- Retention policies (90 days for raw data) +- Continuous aggregates for common queries +- Proper indexes for query performance + +### Redis Configuration + +Redis is configured for: +- High-frequency data caching +- Memory optimization (2GB limit) +- Persistence with AOF and RDB +- Optimized for order book data structures + +## ๐Ÿ”Œ Connection Details + +After deployment, connect using: + +### TimescaleDB +``` +Host: 192.168.0.10 +Port: 5432 +Database: market_data +Username: market_user +Password: (from .env file) +``` + +### Redis +``` +Host: 192.168.0.10 +Port: 6379 +Password: (from .env file) +``` + +## ๐Ÿ—„๏ธ Database Schema + +The system creates the following tables: + +- `order_book_snapshots`: Real-time order book data +- `trade_events`: Individual trade events +- `heatmap_data`: Aggregated price bucket data +- `ohlcv_data`: OHLCV candlestick data +- `exchange_status`: Exchange connection monitoring +- `system_metrics`: System performance metrics + +## ๐Ÿ’พ Backup & Restore + +### Create Backup +```bash +chmod +x backup.sh +./backup.sh +``` + +Backups are stored in `./backups/` with timestamp. + +### Restore from Backup +```bash +chmod +x restore.sh +./restore.sh market_data_backup_YYYYMMDD_HHMMSS.tar.gz +``` + +### Automated Backups + +Set up a cron job for regular backups: +```bash +# Daily backup at 2 AM +0 2 * * * /path/to/docker/backup.sh +``` + +## ๐Ÿ“Š Monitoring + +### Health Checks + +Check service health: +```bash +# TimescaleDB +docker exec market_data_timescaledb pg_isready -U market_user -d market_data + +# Redis +docker exec market_data_redis redis-cli -a your_password ping +``` + +### View Logs +```bash +# All services +docker-compose -f timescaledb-compose.yml logs -f + +# Specific service +docker-compose -f timescaledb-compose.yml logs -f timescaledb +``` + +### Database Queries + +Connect to TimescaleDB: +```bash +docker exec -it market_data_timescaledb psql -U market_user -d market_data +``` + +Example queries: +```sql +-- Check table sizes +SELECT + schemaname, + tablename, + pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size +FROM pg_tables +WHERE schemaname = 'market_data'; + +-- Recent order book data +SELECT * FROM market_data.order_book_snapshots +ORDER BY timestamp DESC LIMIT 10; + +-- Exchange status +SELECT * FROM market_data.exchange_status +ORDER BY timestamp DESC LIMIT 10; +``` + +## ๐Ÿ”ง Maintenance + +### Update Images +```bash +docker-compose -f timescaledb-compose.yml pull +docker-compose -f timescaledb-compose.yml up -d +``` + +### Clean Up Old Data +```bash +# TimescaleDB has automatic retention policies +# Manual cleanup if needed: +docker exec market_data_timescaledb psql -U market_user -d market_data -c " +SELECT drop_chunks('market_data.order_book_snapshots', INTERVAL '30 days'); +" +``` + +### Scale Resources + +Edit `timescaledb-compose.yml` to adjust: +- Memory limits +- CPU limits +- Shared buffers +- Connection limits + +## ๐Ÿšจ Troubleshooting + +### Common Issues + +1. **Port conflicts**: Change ports in compose file if 5432/6379 are in use +2. **Memory issues**: Reduce shared_buffers and Redis maxmemory +3. **Disk space**: Monitor `/var/lib/docker/volumes/` usage +4. **Connection refused**: Check firewall settings and container status + +### Performance Tuning + +1. **TimescaleDB**: + - Adjust `shared_buffers` based on available RAM + - Tune `effective_cache_size` to 75% of system RAM + - Monitor query performance with `pg_stat_statements` + +2. **Redis**: + - Adjust `maxmemory` based on data volume + - Monitor memory usage with `INFO memory` + - Use appropriate eviction policy + +### Recovery Procedures + +1. **Container failure**: `docker-compose restart ` +2. **Data corruption**: Restore from latest backup +3. **Network issues**: Check Docker network configuration +4. **Performance degradation**: Review logs and system metrics + +## ๐Ÿ” Security + +- Change default passwords in `.env` +- Use strong passwords (20+ characters) +- Restrict network access to trusted IPs +- Regular security updates +- Monitor access logs +- Enable SSL/TLS for production + +## ๐Ÿ“ž Support + +For issues related to: +- TimescaleDB: Check [TimescaleDB docs](https://docs.timescale.com/) +- Redis: Check [Redis docs](https://redis.io/documentation) +- Docker: Check [Docker docs](https://docs.docker.com/) + +## ๐Ÿ”„ Updates + +This infrastructure supports: +- Rolling updates with zero downtime +- Blue-green deployments +- Automated failover +- Data migration scripts \ No newline at end of file diff --git a/COBY/docker/backup.sh b/COBY/docker/backup.sh new file mode 100644 index 0000000..529280c --- /dev/null +++ b/COBY/docker/backup.sh @@ -0,0 +1,108 @@ +#!/bin/bash + +# Backup script for market data infrastructure +# Run this script regularly to backup your data + +set -e + +# Configuration +BACKUP_DIR="./backups" +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") +RETENTION_DAYS=30 + +# Load environment variables +if [ -f .env ]; then + source .env +fi + +echo "๐Ÿ—„๏ธ Starting backup process..." + +# Create backup directory if it doesn't exist +mkdir -p "$BACKUP_DIR" + +# Backup TimescaleDB +echo "๐Ÿ“Š Backing up TimescaleDB..." +docker exec market_data_timescaledb pg_dump \ + -U market_user \ + -d market_data \ + --verbose \ + --no-password \ + --format=custom \ + --compress=9 \ + > "$BACKUP_DIR/timescaledb_backup_$TIMESTAMP.dump" + +if [ $? -eq 0 ]; then + echo "โœ… TimescaleDB backup completed: timescaledb_backup_$TIMESTAMP.dump" +else + echo "โŒ TimescaleDB backup failed" + exit 1 +fi + +# Backup Redis +echo "๐Ÿ“ฆ Backing up Redis..." +docker exec market_data_redis redis-cli \ + -a "$REDIS_PASSWORD" \ + --rdb /data/redis_backup_$TIMESTAMP.rdb \ + BGSAVE + +# Wait for Redis backup to complete +sleep 5 + +# Copy Redis backup from container +docker cp market_data_redis:/data/redis_backup_$TIMESTAMP.rdb "$BACKUP_DIR/" + +if [ $? -eq 0 ]; then + echo "โœ… Redis backup completed: redis_backup_$TIMESTAMP.rdb" +else + echo "โŒ Redis backup failed" + exit 1 +fi + +# Create backup metadata +cat > "$BACKUP_DIR/backup_$TIMESTAMP.info" << EOF +Backup Information +================== +Timestamp: $TIMESTAMP +Date: $(date) +TimescaleDB Backup: timescaledb_backup_$TIMESTAMP.dump +Redis Backup: redis_backup_$TIMESTAMP.rdb + +Container Versions: +TimescaleDB: $(docker exec market_data_timescaledb psql -U market_user -d market_data -t -c "SELECT version();") +Redis: $(docker exec market_data_redis redis-cli -a "$REDIS_PASSWORD" INFO server | grep redis_version) + +Database Size: +$(docker exec market_data_timescaledb psql -U market_user -d market_data -c "\l+") +EOF + +# Compress backups +echo "๐Ÿ—œ๏ธ Compressing backups..." +tar -czf "$BACKUP_DIR/market_data_backup_$TIMESTAMP.tar.gz" \ + -C "$BACKUP_DIR" \ + "timescaledb_backup_$TIMESTAMP.dump" \ + "redis_backup_$TIMESTAMP.rdb" \ + "backup_$TIMESTAMP.info" + +# Remove individual files after compression +rm "$BACKUP_DIR/timescaledb_backup_$TIMESTAMP.dump" +rm "$BACKUP_DIR/redis_backup_$TIMESTAMP.rdb" +rm "$BACKUP_DIR/backup_$TIMESTAMP.info" + +echo "โœ… Compressed backup created: market_data_backup_$TIMESTAMP.tar.gz" + +# Clean up old backups +echo "๐Ÿงน Cleaning up old backups (older than $RETENTION_DAYS days)..." +find "$BACKUP_DIR" -name "market_data_backup_*.tar.gz" -mtime +$RETENTION_DAYS -delete + +# Display backup information +BACKUP_SIZE=$(du -h "$BACKUP_DIR/market_data_backup_$TIMESTAMP.tar.gz" | cut -f1) +echo "" +echo "๐Ÿ“‹ Backup Summary:" +echo " File: market_data_backup_$TIMESTAMP.tar.gz" +echo " Size: $BACKUP_SIZE" +echo " Location: $BACKUP_DIR" +echo "" +echo "๐Ÿ”„ To restore from this backup:" +echo " ./restore.sh market_data_backup_$TIMESTAMP.tar.gz" +echo "" +echo "โœ… Backup process completed successfully!" \ No newline at end of file diff --git a/COBY/docker/deploy.sh b/COBY/docker/deploy.sh new file mode 100644 index 0000000..23c500f --- /dev/null +++ b/COBY/docker/deploy.sh @@ -0,0 +1,112 @@ +#!/bin/bash + +# Deployment script for market data infrastructure +# Run this on your Docker host at 192.168.0.10 + +set -e + +echo "๐Ÿš€ Deploying Market Data Infrastructure..." + +# Check if Docker and Docker Compose are available +if ! command -v docker &> /dev/null; then + echo "โŒ Docker is not installed or not in PATH" + exit 1 +fi + +if ! command -v docker-compose &> /dev/null && ! docker compose version &> /dev/null; then + echo "โŒ Docker Compose is not installed or not in PATH" + exit 1 +fi + +# Set Docker Compose command +if docker compose version &> /dev/null; then + DOCKER_COMPOSE="docker compose" +else + DOCKER_COMPOSE="docker-compose" +fi + +# Create necessary directories +echo "๐Ÿ“ Creating directories..." +mkdir -p ./data/timescale +mkdir -p ./data/redis +mkdir -p ./logs +mkdir -p ./backups + +# Set proper permissions +echo "๐Ÿ” Setting permissions..." +chmod 755 ./data/timescale +chmod 755 ./data/redis +chmod 755 ./logs +chmod 755 ./backups + +# Copy environment file if it doesn't exist +if [ ! -f .env ]; then + echo "๐Ÿ“‹ Creating .env file..." + cp .env.example .env + echo "โš ๏ธ Please edit .env file with your specific configuration" + echo "โš ๏ธ Default passwords are set - change them for production!" +fi + +# Pull latest images +echo "๐Ÿ“ฅ Pulling Docker images..." +$DOCKER_COMPOSE -f timescaledb-compose.yml pull + +# Stop existing containers if running +echo "๐Ÿ›‘ Stopping existing containers..." +$DOCKER_COMPOSE -f timescaledb-compose.yml down + +# Start the services +echo "๐Ÿƒ Starting services..." +$DOCKER_COMPOSE -f timescaledb-compose.yml up -d + +# Wait for services to be ready +echo "โณ Waiting for services to be ready..." +sleep 30 + +# Check service health +echo "๐Ÿฅ Checking service health..." + +# Check TimescaleDB +if docker exec market_data_timescaledb pg_isready -U market_user -d market_data; then + echo "โœ… TimescaleDB is ready" +else + echo "โŒ TimescaleDB is not ready" + exit 1 +fi + +# Check Redis +if docker exec market_data_redis redis-cli -a market_data_redis_2024 ping | grep -q PONG; then + echo "โœ… Redis is ready" +else + echo "โŒ Redis is not ready" + exit 1 +fi + +# Display connection information +echo "" +echo "๐ŸŽ‰ Deployment completed successfully!" +echo "" +echo "๐Ÿ“Š Connection Information:" +echo " TimescaleDB:" +echo " Host: 192.168.0.10" +echo " Port: 5432" +echo " Database: market_data" +echo " Username: market_user" +echo " Password: (check .env file)" +echo "" +echo " Redis:" +echo " Host: 192.168.0.10" +echo " Port: 6379" +echo " Password: (check .env file)" +echo "" +echo "๐Ÿ“ Next steps:" +echo " 1. Update your application configuration to use these connection details" +echo " 2. Test the connection from your application" +echo " 3. Set up monitoring and alerting" +echo " 4. Configure backup schedules" +echo "" +echo "๐Ÿ” To view logs:" +echo " docker-compose -f timescaledb-compose.yml logs -f" +echo "" +echo "๐Ÿ›‘ To stop services:" +echo " docker-compose -f timescaledb-compose.yml down" \ No newline at end of file diff --git a/COBY/docker/init-scripts/01-init-timescaledb.sql b/COBY/docker/init-scripts/01-init-timescaledb.sql new file mode 100644 index 0000000..7080fb4 --- /dev/null +++ b/COBY/docker/init-scripts/01-init-timescaledb.sql @@ -0,0 +1,214 @@ +-- Initialize TimescaleDB extension and create market data schema +CREATE EXTENSION IF NOT EXISTS timescaledb; + +-- Create database schema for market data +CREATE SCHEMA IF NOT EXISTS market_data; + +-- Set search path +SET search_path TO market_data, public; + +-- Order book snapshots table +CREATE TABLE IF NOT EXISTS order_book_snapshots ( + id BIGSERIAL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + bids JSONB NOT NULL, + asks JSONB NOT NULL, + sequence_id BIGINT, + mid_price DECIMAL(20,8), + spread DECIMAL(20,8), + bid_volume DECIMAL(30,8), + ask_volume DECIMAL(30,8), + created_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (timestamp, symbol, exchange) +); + +-- Convert to hypertable +SELECT create_hypertable('order_book_snapshots', 'timestamp', if_not_exists => TRUE); + +-- Create indexes for better query performance +CREATE INDEX IF NOT EXISTS idx_order_book_symbol_exchange ON order_book_snapshots (symbol, exchange, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_order_book_timestamp ON order_book_snapshots (timestamp DESC); + +-- Trade events table +CREATE TABLE IF NOT EXISTS trade_events ( + id BIGSERIAL, + symbol VARCHAR(20) NOT NULL, + exchange VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + price DECIMAL(20,8) NOT NULL, + size DECIMAL(30,8) NOT NULL, + side VARCHAR(4) NOT NULL, + trade_id VARCHAR(100) NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (timestamp, symbol, exchange, trade_id) +); + +-- Convert to hypertable +SELECT create_hypertable('trade_events', 'timestamp', if_not_exists => TRUE); + +-- Create indexes for trade events +CREATE INDEX IF NOT EXISTS idx_trade_events_symbol_exchange ON trade_events (symbol, exchange, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_trade_events_timestamp ON trade_events (timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_trade_events_price ON trade_events (symbol, price, timestamp DESC); + +-- Aggregated heatmap data table +CREATE TABLE IF NOT EXISTS heatmap_data ( + symbol VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + bucket_size DECIMAL(10,2) NOT NULL, + price_bucket DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + side VARCHAR(3) NOT NULL, + exchange_count INTEGER NOT NULL, + exchanges JSONB, + created_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (timestamp, symbol, bucket_size, price_bucket, side) +); + +-- Convert to hypertable +SELECT create_hypertable('heatmap_data', 'timestamp', if_not_exists => TRUE); + +-- Create indexes for heatmap data +CREATE INDEX IF NOT EXISTS idx_heatmap_symbol_bucket ON heatmap_data (symbol, bucket_size, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_heatmap_timestamp ON heatmap_data (timestamp DESC); + +-- OHLCV data table +CREATE TABLE IF NOT EXISTS ohlcv_data ( + symbol VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + timeframe VARCHAR(10) NOT NULL, + open_price DECIMAL(20,8) NOT NULL, + high_price DECIMAL(20,8) NOT NULL, + low_price DECIMAL(20,8) NOT NULL, + close_price DECIMAL(20,8) NOT NULL, + volume DECIMAL(30,8) NOT NULL, + trade_count INTEGER, + vwap DECIMAL(20,8), + created_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (timestamp, symbol, timeframe) +); + +-- Convert to hypertable +SELECT create_hypertable('ohlcv_data', 'timestamp', if_not_exists => TRUE); + +-- Create indexes for OHLCV data +CREATE INDEX IF NOT EXISTS idx_ohlcv_symbol_timeframe ON ohlcv_data (symbol, timeframe, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_ohlcv_timestamp ON ohlcv_data (timestamp DESC); + +-- Exchange status tracking table +CREATE TABLE IF NOT EXISTS exchange_status ( + exchange VARCHAR(20) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + status VARCHAR(20) NOT NULL, -- 'connected', 'disconnected', 'error' + last_message_time TIMESTAMPTZ, + error_message TEXT, + connection_count INTEGER DEFAULT 0, + created_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (timestamp, exchange) +); + +-- Convert to hypertable +SELECT create_hypertable('exchange_status', 'timestamp', if_not_exists => TRUE); + +-- Create indexes for exchange status +CREATE INDEX IF NOT EXISTS idx_exchange_status_exchange ON exchange_status (exchange, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_exchange_status_timestamp ON exchange_status (timestamp DESC); + +-- System metrics table for monitoring +CREATE TABLE IF NOT EXISTS system_metrics ( + metric_name VARCHAR(50) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + value DECIMAL(20,8) NOT NULL, + labels JSONB, + created_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (timestamp, metric_name) +); + +-- Convert to hypertable +SELECT create_hypertable('system_metrics', 'timestamp', if_not_exists => TRUE); + +-- Create indexes for system metrics +CREATE INDEX IF NOT EXISTS idx_system_metrics_name ON system_metrics (metric_name, timestamp DESC); +CREATE INDEX IF NOT EXISTS idx_system_metrics_timestamp ON system_metrics (timestamp DESC); + +-- Create retention policies (keep data for 90 days by default) +SELECT add_retention_policy('order_book_snapshots', INTERVAL '90 days', if_not_exists => TRUE); +SELECT add_retention_policy('trade_events', INTERVAL '90 days', if_not_exists => TRUE); +SELECT add_retention_policy('heatmap_data', INTERVAL '90 days', if_not_exists => TRUE); +SELECT add_retention_policy('ohlcv_data', INTERVAL '365 days', if_not_exists => TRUE); +SELECT add_retention_policy('exchange_status', INTERVAL '30 days', if_not_exists => TRUE); +SELECT add_retention_policy('system_metrics', INTERVAL '30 days', if_not_exists => TRUE); + +-- Create continuous aggregates for common queries +CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_ohlcv +WITH (timescaledb.continuous) AS +SELECT + symbol, + exchange, + time_bucket('1 hour', timestamp) AS hour, + first(price, timestamp) AS open_price, + max(price) AS high_price, + min(price) AS low_price, + last(price, timestamp) AS close_price, + sum(size) AS volume, + count(*) AS trade_count, + avg(price) AS vwap +FROM trade_events +GROUP BY symbol, exchange, hour +WITH NO DATA; + +-- Add refresh policy for continuous aggregate +SELECT add_continuous_aggregate_policy('hourly_ohlcv', + start_offset => INTERVAL '3 hours', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour', + if_not_exists => TRUE); + +-- Create view for latest order book data +CREATE OR REPLACE VIEW latest_order_books AS +SELECT DISTINCT ON (symbol, exchange) + symbol, + exchange, + timestamp, + bids, + asks, + mid_price, + spread, + bid_volume, + ask_volume +FROM order_book_snapshots +ORDER BY symbol, exchange, timestamp DESC; + +-- Create view for latest heatmap data +CREATE OR REPLACE VIEW latest_heatmaps AS +SELECT DISTINCT ON (symbol, bucket_size, price_bucket, side) + symbol, + bucket_size, + price_bucket, + side, + timestamp, + volume, + exchange_count, + exchanges +FROM heatmap_data +ORDER BY symbol, bucket_size, price_bucket, side, timestamp DESC; + +-- Grant permissions to market_user +GRANT ALL PRIVILEGES ON SCHEMA market_data TO market_user; +GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA market_data TO market_user; +GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA market_data TO market_user; +GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA market_data TO market_user; + +-- Set default privileges for future objects +ALTER DEFAULT PRIVILEGES IN SCHEMA market_data GRANT ALL ON TABLES TO market_user; +ALTER DEFAULT PRIVILEGES IN SCHEMA market_data GRANT ALL ON SEQUENCES TO market_user; +ALTER DEFAULT PRIVILEGES IN SCHEMA market_data GRANT ALL ON FUNCTIONS TO market_user; + +-- Create database user for read-only access (for dashboards) +CREATE USER IF NOT EXISTS dashboard_user WITH PASSWORD 'dashboard_read_2024'; +GRANT CONNECT ON DATABASE market_data TO dashboard_user; +GRANT USAGE ON SCHEMA market_data TO dashboard_user; +GRANT SELECT ON ALL TABLES IN SCHEMA market_data TO dashboard_user; +ALTER DEFAULT PRIVILEGES IN SCHEMA market_data GRANT SELECT ON TABLES TO dashboard_user; \ No newline at end of file diff --git a/COBY/docker/manual-init.sh b/COBY/docker/manual-init.sh new file mode 100644 index 0000000..5ece539 --- /dev/null +++ b/COBY/docker/manual-init.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +# Manual database initialization script +# Run this to initialize the TimescaleDB schema + +echo "๐Ÿ”ง Initializing TimescaleDB schema..." + +# Check if we can connect to the database +echo "๐Ÿ“ก Testing connection to TimescaleDB..." + +# You can run this command on your Docker host (192.168.0.10) +# Replace with your actual password from the .env file + +PGPASSWORD="market_data_secure_pass_2024" psql -h 192.168.0.10 -p 5432 -U market_user -d market_data -c "SELECT version();" + +if [ $? -eq 0 ]; then + echo "โœ… Connection successful!" + + echo "๐Ÿ—๏ธ Creating database schema..." + + # Execute the initialization script + PGPASSWORD="market_data_secure_pass_2024" psql -h 192.168.0.10 -p 5432 -U market_user -d market_data -f ../docker/init-scripts/01-init-timescaledb.sql + + if [ $? -eq 0 ]; then + echo "โœ… Database schema initialized successfully!" + + echo "๐Ÿ“Š Verifying tables..." + PGPASSWORD="market_data_secure_pass_2024" psql -h 192.168.0.10 -p 5432 -U market_user -d market_data -c "\dt market_data.*" + + else + echo "โŒ Schema initialization failed" + exit 1 + fi +else + echo "โŒ Cannot connect to database" + exit 1 +fi \ No newline at end of file diff --git a/COBY/docker/redis.conf b/COBY/docker/redis.conf new file mode 100644 index 0000000..3b4ffd9 --- /dev/null +++ b/COBY/docker/redis.conf @@ -0,0 +1,131 @@ +# Redis configuration for market data caching +# Optimized for high-frequency trading data + +# Network settings +bind 0.0.0.0 +port 6379 +tcp-backlog 511 +timeout 0 +tcp-keepalive 300 + +# General settings +daemonize no +supervised no +pidfile /var/run/redis_6379.pid +loglevel notice +logfile "" +databases 16 + +# Snapshotting (persistence) +save 900 1 +save 300 10 +save 60 10000 +stop-writes-on-bgsave-error yes +rdbcompression yes +rdbchecksum yes +dbfilename dump.rdb +dir /data + +# Replication +replica-serve-stale-data yes +replica-read-only yes +repl-diskless-sync no +repl-diskless-sync-delay 5 +repl-ping-replica-period 10 +repl-timeout 60 +repl-disable-tcp-nodelay no +repl-backlog-size 1mb +repl-backlog-ttl 3600 + +# Security +requirepass market_data_redis_2024 + +# Memory management +maxmemory 2gb +maxmemory-policy allkeys-lru +maxmemory-samples 5 + +# Lazy freeing +lazyfree-lazy-eviction no +lazyfree-lazy-expire no +lazyfree-lazy-server-del no +replica-lazy-flush no + +# Threaded I/O +io-threads 4 +io-threads-do-reads yes + +# Append only file (AOF) +appendonly yes +appendfilename "appendonly.aof" +appendfsync everysec +no-appendfsync-on-rewrite no +auto-aof-rewrite-percentage 100 +auto-aof-rewrite-min-size 64mb +aof-load-truncated yes +aof-use-rdb-preamble yes + +# Lua scripting +lua-time-limit 5000 + +# Slow log +slowlog-log-slower-than 10000 +slowlog-max-len 128 + +# Latency monitor +latency-monitor-threshold 100 + +# Event notification +notify-keyspace-events "" + +# Hash settings (optimized for order book data) +hash-max-ziplist-entries 512 +hash-max-ziplist-value 64 + +# List settings +list-max-ziplist-size -2 +list-compress-depth 0 + +# Set settings +set-max-intset-entries 512 + +# Sorted set settings +zset-max-ziplist-entries 128 +zset-max-ziplist-value 64 + +# HyperLogLog settings +hll-sparse-max-bytes 3000 + +# Streams settings +stream-node-max-bytes 4096 +stream-node-max-entries 100 + +# Active rehashing +activerehashing yes + +# Client settings +client-output-buffer-limit normal 0 0 0 +client-output-buffer-limit replica 256mb 64mb 60 +client-output-buffer-limit pubsub 32mb 8mb 60 +client-query-buffer-limit 1gb + +# Protocol settings +proto-max-bulk-len 512mb + +# Frequency settings +hz 10 + +# Dynamic HZ +dynamic-hz yes + +# AOF rewrite settings +aof-rewrite-incremental-fsync yes + +# RDB settings +rdb-save-incremental-fsync yes + +# Jemalloc settings +jemalloc-bg-thread yes + +# TLS settings (disabled for internal network) +tls-port 0 \ No newline at end of file diff --git a/COBY/docker/restore.sh b/COBY/docker/restore.sh new file mode 100644 index 0000000..8138699 --- /dev/null +++ b/COBY/docker/restore.sh @@ -0,0 +1,188 @@ +#!/bin/bash + +# Restore script for market data infrastructure +# Usage: ./restore.sh + +set -e + +# Check if backup file is provided +if [ $# -eq 0 ]; then + echo "โŒ Usage: $0 " + echo "Available backups:" + ls -la ./backups/market_data_backup_*.tar.gz 2>/dev/null || echo "No backups found" + exit 1 +fi + +BACKUP_FILE="$1" +RESTORE_DIR="./restore_temp" +TIMESTAMP=$(date +"%Y%m%d_%H%M%S") + +# Load environment variables +if [ -f .env ]; then + source .env +fi + +echo "๐Ÿ”„ Starting restore process..." +echo "๐Ÿ“ Backup file: $BACKUP_FILE" + +# Check if backup file exists +if [ ! -f "$BACKUP_FILE" ]; then + echo "โŒ Backup file not found: $BACKUP_FILE" + exit 1 +fi + +# Create temporary restore directory +mkdir -p "$RESTORE_DIR" + +# Extract backup +echo "๐Ÿ“ฆ Extracting backup..." +tar -xzf "$BACKUP_FILE" -C "$RESTORE_DIR" + +# Find extracted files +TIMESCALE_BACKUP=$(find "$RESTORE_DIR" -name "timescaledb_backup_*.dump" | head -1) +REDIS_BACKUP=$(find "$RESTORE_DIR" -name "redis_backup_*.rdb" | head -1) +BACKUP_INFO=$(find "$RESTORE_DIR" -name "backup_*.info" | head -1) + +if [ -z "$TIMESCALE_BACKUP" ] || [ -z "$REDIS_BACKUP" ]; then + echo "โŒ Invalid backup file structure" + rm -rf "$RESTORE_DIR" + exit 1 +fi + +# Display backup information +if [ -f "$BACKUP_INFO" ]; then + echo "๐Ÿ“‹ Backup Information:" + cat "$BACKUP_INFO" + echo "" +fi + +# Confirm restore +read -p "โš ๏ธ This will replace all existing data. Continue? (y/N): " -n 1 -r +echo +if [[ ! $REPLY =~ ^[Yy]$ ]]; then + echo "โŒ Restore cancelled" + rm -rf "$RESTORE_DIR" + exit 1 +fi + +# Stop services +echo "๐Ÿ›‘ Stopping services..." +docker-compose -f timescaledb-compose.yml down + +# Backup current data (just in case) +echo "๐Ÿ’พ Creating safety backup of current data..." +mkdir -p "./backups/pre_restore_$TIMESTAMP" +docker run --rm -v market_data_timescale_data:/data -v "$(pwd)/backups/pre_restore_$TIMESTAMP":/backup alpine tar czf /backup/current_timescale.tar.gz -C /data . +docker run --rm -v market_data_redis_data:/data -v "$(pwd)/backups/pre_restore_$TIMESTAMP":/backup alpine tar czf /backup/current_redis.tar.gz -C /data . + +# Start only TimescaleDB for restore +echo "๐Ÿƒ Starting TimescaleDB for restore..." +docker-compose -f timescaledb-compose.yml up -d timescaledb + +# Wait for TimescaleDB to be ready +echo "โณ Waiting for TimescaleDB to be ready..." +sleep 30 + +# Check if TimescaleDB is ready +if ! docker exec market_data_timescaledb pg_isready -U market_user -d market_data; then + echo "โŒ TimescaleDB is not ready" + exit 1 +fi + +# Drop existing database and recreate +echo "๐Ÿ—‘๏ธ Dropping existing database..." +docker exec market_data_timescaledb psql -U postgres -c "DROP DATABASE IF EXISTS market_data;" +docker exec market_data_timescaledb psql -U postgres -c "CREATE DATABASE market_data OWNER market_user;" + +# Restore TimescaleDB +echo "๐Ÿ“Š Restoring TimescaleDB..." +docker cp "$TIMESCALE_BACKUP" market_data_timescaledb:/tmp/restore.dump +docker exec market_data_timescaledb pg_restore \ + -U market_user \ + -d market_data \ + --verbose \ + --no-password \ + /tmp/restore.dump + +if [ $? -eq 0 ]; then + echo "โœ… TimescaleDB restore completed" +else + echo "โŒ TimescaleDB restore failed" + exit 1 +fi + +# Stop TimescaleDB +docker-compose -f timescaledb-compose.yml stop timescaledb + +# Restore Redis data +echo "๐Ÿ“ฆ Restoring Redis data..." +# Remove existing Redis data +docker volume rm market_data_redis_data 2>/dev/null || true +docker volume create market_data_redis_data + +# Copy Redis backup to volume +docker run --rm -v market_data_redis_data:/data -v "$(pwd)/$RESTORE_DIR":/backup alpine cp "/backup/$(basename "$REDIS_BACKUP")" /data/dump.rdb + +# Start all services +echo "๐Ÿƒ Starting all services..." +docker-compose -f timescaledb-compose.yml up -d + +# Wait for services to be ready +echo "โณ Waiting for services to be ready..." +sleep 30 + +# Verify restore +echo "๐Ÿ” Verifying restore..." + +# Check TimescaleDB +if docker exec market_data_timescaledb pg_isready -U market_user -d market_data; then + echo "โœ… TimescaleDB is ready" + + # Show table counts + echo "๐Ÿ“Š Database table counts:" + docker exec market_data_timescaledb psql -U market_user -d market_data -c " + SELECT + schemaname, + tablename, + n_tup_ins as row_count + FROM pg_stat_user_tables + WHERE schemaname = 'market_data' + ORDER BY tablename; + " +else + echo "โŒ TimescaleDB verification failed" + exit 1 +fi + +# Check Redis +if docker exec market_data_redis redis-cli -a "$REDIS_PASSWORD" ping | grep -q PONG; then + echo "โœ… Redis is ready" + + # Show Redis info + echo "๐Ÿ“ฆ Redis database info:" + docker exec market_data_redis redis-cli -a "$REDIS_PASSWORD" INFO keyspace +else + echo "โŒ Redis verification failed" + exit 1 +fi + +# Clean up +echo "๐Ÿงน Cleaning up temporary files..." +rm -rf "$RESTORE_DIR" + +echo "" +echo "๐ŸŽ‰ Restore completed successfully!" +echo "" +echo "๐Ÿ“‹ Restore Summary:" +echo " Source: $BACKUP_FILE" +echo " Timestamp: $TIMESTAMP" +echo " Safety backup: ./backups/pre_restore_$TIMESTAMP/" +echo "" +echo "โš ๏ธ If you encounter any issues, you can restore the safety backup:" +echo " docker-compose -f timescaledb-compose.yml down" +echo " docker volume rm market_data_timescale_data market_data_redis_data" +echo " docker volume create market_data_timescale_data" +echo " docker volume create market_data_redis_data" +echo " docker run --rm -v market_data_timescale_data:/data -v $(pwd)/backups/pre_restore_$TIMESTAMP:/backup alpine tar xzf /backup/current_timescale.tar.gz -C /data" +echo " docker run --rm -v market_data_redis_data:/data -v $(pwd)/backups/pre_restore_$TIMESTAMP:/backup alpine tar xzf /backup/current_redis.tar.gz -C /data" +echo " docker-compose -f timescaledb-compose.yml up -d" \ No newline at end of file diff --git a/COBY/docker/timescaledb-compose.yml b/COBY/docker/timescaledb-compose.yml new file mode 100644 index 0000000..0c249a9 --- /dev/null +++ b/COBY/docker/timescaledb-compose.yml @@ -0,0 +1,78 @@ +version: '3.8' + +services: + timescaledb: + image: timescale/timescaledb:latest-pg15 + container_name: market_data_timescaledb + restart: unless-stopped + environment: + POSTGRES_DB: market_data + POSTGRES_USER: market_user + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-market_data_secure_pass_2024} + POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --lc-collate=C --lc-ctype=C" + # TimescaleDB specific settings + TIMESCALEDB_TELEMETRY: 'off' + ports: + - "5432:5432" + volumes: + - timescale_data:/var/lib/postgresql/data + - ./init-scripts:/docker-entrypoint-initdb.d + command: > + postgres + -c shared_preload_libraries=timescaledb + -c max_connections=200 + -c shared_buffers=256MB + -c effective_cache_size=1GB + -c maintenance_work_mem=64MB + -c checkpoint_completion_target=0.9 + -c wal_buffers=16MB + -c default_statistics_target=100 + -c random_page_cost=1.1 + -c effective_io_concurrency=200 + -c work_mem=4MB + -c min_wal_size=1GB + -c max_wal_size=4GB + -c max_worker_processes=8 + -c max_parallel_workers_per_gather=4 + -c max_parallel_workers=8 + -c max_parallel_maintenance_workers=4 + healthcheck: + test: ["CMD-SHELL", "pg_isready -U market_user -d market_data"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + networks: + - market_data_network + + redis: + image: redis:7-alpine + container_name: market_data_redis + restart: unless-stopped + ports: + - "6379:6379" + volumes: + - redis_data:/data + - ./redis.conf:/usr/local/etc/redis/redis.conf + command: redis-server /usr/local/etc/redis/redis.conf + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 30s + networks: + - market_data_network + +volumes: + timescale_data: + driver: local + redis_data: + driver: local + +networks: + market_data_network: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 \ No newline at end of file diff --git a/COBY/interfaces/__init__.py b/COBY/interfaces/__init__.py new file mode 100644 index 0000000..58ceaef --- /dev/null +++ b/COBY/interfaces/__init__.py @@ -0,0 +1,17 @@ +""" +Interface definitions for the multi-exchange data aggregation system. +""" + +from .exchange_connector import ExchangeConnector +from .data_processor import DataProcessor +from .aggregation_engine import AggregationEngine +from .storage_manager import StorageManager +from .replay_manager import ReplayManager + +__all__ = [ + 'ExchangeConnector', + 'DataProcessor', + 'AggregationEngine', + 'StorageManager', + 'ReplayManager' +] \ No newline at end of file diff --git a/COBY/interfaces/aggregation_engine.py b/COBY/interfaces/aggregation_engine.py new file mode 100644 index 0000000..5b032ee --- /dev/null +++ b/COBY/interfaces/aggregation_engine.py @@ -0,0 +1,139 @@ +""" +Interface for data aggregation and heatmap generation. +""" + +from abc import ABC, abstractmethod +from typing import Dict, List +from ..models.core import ( + OrderBookSnapshot, PriceBuckets, HeatmapData, + ImbalanceMetrics, ConsolidatedOrderBook +) + + +class AggregationEngine(ABC): + """Aggregates data into price buckets and heatmaps""" + + @abstractmethod + def create_price_buckets(self, orderbook: OrderBookSnapshot, + bucket_size: float) -> PriceBuckets: + """ + Convert order book data to price buckets. + + Args: + orderbook: Order book snapshot + bucket_size: Size of each price bucket + + Returns: + PriceBuckets: Aggregated price bucket data + """ + pass + + @abstractmethod + def update_heatmap(self, symbol: str, buckets: PriceBuckets) -> HeatmapData: + """ + Update heatmap data with new price buckets. + + Args: + symbol: Trading symbol + buckets: Price bucket data + + Returns: + HeatmapData: Updated heatmap visualization data + """ + pass + + @abstractmethod + def calculate_imbalances(self, orderbook: OrderBookSnapshot) -> ImbalanceMetrics: + """ + Calculate order book imbalance metrics. + + Args: + orderbook: Order book snapshot + + Returns: + ImbalanceMetrics: Calculated imbalance metrics + """ + pass + + @abstractmethod + def aggregate_across_exchanges(self, symbol: str, + orderbooks: List[OrderBookSnapshot]) -> ConsolidatedOrderBook: + """ + Aggregate order book data from multiple exchanges. + + Args: + symbol: Trading symbol + orderbooks: List of order book snapshots from different exchanges + + Returns: + ConsolidatedOrderBook: Consolidated order book data + """ + pass + + @abstractmethod + def calculate_volume_weighted_price(self, orderbooks: List[OrderBookSnapshot]) -> float: + """ + Calculate volume-weighted average price across exchanges. + + Args: + orderbooks: List of order book snapshots + + Returns: + float: Volume-weighted average price + """ + pass + + @abstractmethod + def get_market_depth(self, orderbook: OrderBookSnapshot, + depth_levels: List[float]) -> Dict[float, Dict[str, float]]: + """ + Calculate market depth at different price levels. + + Args: + orderbook: Order book snapshot + depth_levels: List of depth percentages (e.g., [0.1, 0.5, 1.0]) + + Returns: + Dict: Market depth data {level: {'bid_volume': x, 'ask_volume': y}} + """ + pass + + @abstractmethod + def smooth_heatmap(self, heatmap: HeatmapData, smoothing_factor: float) -> HeatmapData: + """ + Apply smoothing to heatmap data to reduce noise. + + Args: + heatmap: Raw heatmap data + smoothing_factor: Smoothing factor (0.0 to 1.0) + + Returns: + HeatmapData: Smoothed heatmap data + """ + pass + + @abstractmethod + def calculate_liquidity_score(self, orderbook: OrderBookSnapshot) -> float: + """ + Calculate liquidity score for an order book. + + Args: + orderbook: Order book snapshot + + Returns: + float: Liquidity score (0.0 to 1.0) + """ + pass + + @abstractmethod + def detect_support_resistance(self, heatmap: HeatmapData) -> Dict[str, List[float]]: + """ + Detect support and resistance levels from heatmap data. + + Args: + heatmap: Heatmap data + + Returns: + Dict: {'support': [prices], 'resistance': [prices]} + """ + pass \ No newline at end of file diff --git a/COBY/interfaces/data_processor.py b/COBY/interfaces/data_processor.py new file mode 100644 index 0000000..7e0ddb6 --- /dev/null +++ b/COBY/interfaces/data_processor.py @@ -0,0 +1,119 @@ +""" +Interface for data processing and normalization. +""" + +from abc import ABC, abstractmethod +from typing import Dict, Union, List, Optional +from ..models.core import OrderBookSnapshot, TradeEvent, OrderBookMetrics + + +class DataProcessor(ABC): + """Processes and normalizes raw exchange data""" + + @abstractmethod + def normalize_orderbook(self, raw_data: Dict, exchange: str) -> OrderBookSnapshot: + """ + Normalize raw order book data to standard format. + + Args: + raw_data: Raw order book data from exchange + exchange: Exchange name + + Returns: + OrderBookSnapshot: Normalized order book data + """ + pass + + @abstractmethod + def normalize_trade(self, raw_data: Dict, exchange: str) -> TradeEvent: + """ + Normalize raw trade data to standard format. + + Args: + raw_data: Raw trade data from exchange + exchange: Exchange name + + Returns: + TradeEvent: Normalized trade data + """ + pass + + @abstractmethod + def validate_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> bool: + """ + Validate normalized data for quality and consistency. + + Args: + data: Normalized data to validate + + Returns: + bool: True if data is valid, False otherwise + """ + pass + + @abstractmethod + def calculate_metrics(self, orderbook: OrderBookSnapshot) -> OrderBookMetrics: + """ + Calculate metrics from order book data. + + Args: + orderbook: Order book snapshot + + Returns: + OrderBookMetrics: Calculated metrics + """ + pass + + @abstractmethod + def detect_anomalies(self, data: Union[OrderBookSnapshot, TradeEvent]) -> List[str]: + """ + Detect anomalies in the data. + + Args: + data: Data to analyze for anomalies + + Returns: + List[str]: List of detected anomaly descriptions + """ + pass + + @abstractmethod + def filter_data(self, data: Union[OrderBookSnapshot, TradeEvent], + criteria: Dict) -> bool: + """ + Filter data based on criteria. + + Args: + data: Data to filter + criteria: Filtering criteria + + Returns: + bool: True if data passes filter, False otherwise + """ + pass + + @abstractmethod + def enrich_data(self, data: Union[OrderBookSnapshot, TradeEvent]) -> Dict: + """ + Enrich data with additional metadata. + + Args: + data: Data to enrich + + Returns: + Dict: Enriched data with metadata + """ + pass + + @abstractmethod + def get_data_quality_score(self, data: Union[OrderBookSnapshot, TradeEvent]) -> float: + """ + Calculate data quality score. + + Args: + data: Data to score + + Returns: + float: Quality score between 0.0 and 1.0 + """ + pass \ No newline at end of file diff --git a/COBY/interfaces/exchange_connector.py b/COBY/interfaces/exchange_connector.py new file mode 100644 index 0000000..cf434fa --- /dev/null +++ b/COBY/interfaces/exchange_connector.py @@ -0,0 +1,189 @@ +""" +Base interface for exchange WebSocket connectors. +""" + +from abc import ABC, abstractmethod +from typing import Callable, List, Optional +from ..models.core import ConnectionStatus, OrderBookSnapshot, TradeEvent + + +class ExchangeConnector(ABC): + """Base interface for exchange WebSocket connectors""" + + def __init__(self, exchange_name: str): + self.exchange_name = exchange_name + self._data_callbacks: List[Callable] = [] + self._status_callbacks: List[Callable] = [] + self._connection_status = ConnectionStatus.DISCONNECTED + + @abstractmethod + async def connect(self) -> bool: + """ + Establish connection to the exchange WebSocket. + + Returns: + bool: True if connection successful, False otherwise + """ + pass + + @abstractmethod + async def disconnect(self) -> None: + """Disconnect from the exchange WebSocket.""" + pass + + @abstractmethod + async def subscribe_orderbook(self, symbol: str) -> None: + """ + Subscribe to order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + pass + + @abstractmethod + async def subscribe_trades(self, symbol: str) -> None: + """ + Subscribe to trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + pass + + @abstractmethod + async def unsubscribe_orderbook(self, symbol: str) -> None: + """ + Unsubscribe from order book updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + pass + + @abstractmethod + async def unsubscribe_trades(self, symbol: str) -> None: + """ + Unsubscribe from trade updates for a symbol. + + Args: + symbol: Trading symbol (e.g., 'BTCUSDT') + """ + pass + + def get_connection_status(self) -> ConnectionStatus: + """ + Get current connection status. + + Returns: + ConnectionStatus: Current connection status + """ + return self._connection_status + + def add_data_callback(self, callback: Callable) -> None: + """ + Add callback for data updates. + + Args: + callback: Function to call when data is received + Signature: callback(data: Union[OrderBookSnapshot, TradeEvent]) + """ + if callback not in self._data_callbacks: + self._data_callbacks.append(callback) + + def remove_data_callback(self, callback: Callable) -> None: + """ + Remove data callback. + + Args: + callback: Callback function to remove + """ + if callback in self._data_callbacks: + self._data_callbacks.remove(callback) + + def add_status_callback(self, callback: Callable) -> None: + """ + Add callback for status updates. + + Args: + callback: Function to call when status changes + Signature: callback(exchange: str, status: ConnectionStatus) + """ + if callback not in self._status_callbacks: + self._status_callbacks.append(callback) + + def remove_status_callback(self, callback: Callable) -> None: + """ + Remove status callback. + + Args: + callback: Callback function to remove + """ + if callback in self._status_callbacks: + self._status_callbacks.remove(callback) + + def _notify_data_callbacks(self, data): + """Notify all data callbacks of new data.""" + for callback in self._data_callbacks: + try: + callback(data) + except Exception as e: + # Log error but don't stop other callbacks + print(f"Error in data callback: {e}") + + def _notify_status_callbacks(self, status: ConnectionStatus): + """Notify all status callbacks of status change.""" + self._connection_status = status + for callback in self._status_callbacks: + try: + callback(self.exchange_name, status) + except Exception as e: + # Log error but don't stop other callbacks + print(f"Error in status callback: {e}") + + @abstractmethod + async def get_symbols(self) -> List[str]: + """ + Get list of available trading symbols. + + Returns: + List[str]: List of available symbols + """ + pass + + @abstractmethod + def normalize_symbol(self, symbol: str) -> str: + """ + Normalize symbol to exchange format. + + Args: + symbol: Standard symbol format (e.g., 'BTCUSDT') + + Returns: + str: Exchange-specific symbol format + """ + pass + + @abstractmethod + async def get_orderbook_snapshot(self, symbol: str, depth: int = 20) -> Optional[OrderBookSnapshot]: + """ + Get current order book snapshot. + + Args: + symbol: Trading symbol + depth: Number of price levels to retrieve + + Returns: + OrderBookSnapshot: Current order book or None if unavailable + """ + pass + + @property + def name(self) -> str: + """Get exchange name.""" + return self.exchange_name + + @property + def is_connected(self) -> bool: + """Check if connector is connected.""" + return self._connection_status == ConnectionStatus.CONNECTED \ No newline at end of file diff --git a/COBY/interfaces/replay_manager.py b/COBY/interfaces/replay_manager.py new file mode 100644 index 0000000..0b78c31 --- /dev/null +++ b/COBY/interfaces/replay_manager.py @@ -0,0 +1,212 @@ +""" +Interface for historical data replay functionality. +""" + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import List, Optional, Callable, Dict, Any +from ..models.core import ReplaySession, ReplayStatus + + +class ReplayManager(ABC): + """Provides historical data replay functionality""" + + @abstractmethod + def create_replay_session(self, start_time: datetime, end_time: datetime, + speed: float = 1.0, symbols: Optional[List[str]] = None, + exchanges: Optional[List[str]] = None) -> str: + """ + Create a new replay session. + + Args: + start_time: Replay start time + end_time: Replay end time + speed: Playback speed multiplier (1.0 = real-time) + symbols: List of symbols to replay (None = all) + exchanges: List of exchanges to replay (None = all) + + Returns: + str: Session ID + """ + pass + + @abstractmethod + async def start_replay(self, session_id: str) -> None: + """ + Start replay session. + + Args: + session_id: Session ID to start + """ + pass + + @abstractmethod + async def pause_replay(self, session_id: str) -> None: + """ + Pause replay session. + + Args: + session_id: Session ID to pause + """ + pass + + @abstractmethod + async def resume_replay(self, session_id: str) -> None: + """ + Resume paused replay session. + + Args: + session_id: Session ID to resume + """ + pass + + @abstractmethod + async def stop_replay(self, session_id: str) -> None: + """ + Stop replay session. + + Args: + session_id: Session ID to stop + """ + pass + + @abstractmethod + def get_replay_status(self, session_id: str) -> Optional[ReplaySession]: + """ + Get replay session status. + + Args: + session_id: Session ID + + Returns: + ReplaySession: Session status or None if not found + """ + pass + + @abstractmethod + def list_replay_sessions(self) -> List[ReplaySession]: + """ + List all replay sessions. + + Returns: + List[ReplaySession]: List of all sessions + """ + pass + + @abstractmethod + def delete_replay_session(self, session_id: str) -> bool: + """ + Delete replay session. + + Args: + session_id: Session ID to delete + + Returns: + bool: True if deleted successfully, False otherwise + """ + pass + + @abstractmethod + def set_replay_speed(self, session_id: str, speed: float) -> bool: + """ + Change replay speed for active session. + + Args: + session_id: Session ID + speed: New playback speed multiplier + + Returns: + bool: True if speed changed successfully, False otherwise + """ + pass + + @abstractmethod + def seek_replay(self, session_id: str, timestamp: datetime) -> bool: + """ + Seek to specific timestamp in replay. + + Args: + session_id: Session ID + timestamp: Target timestamp + + Returns: + bool: True if seek successful, False otherwise + """ + pass + + @abstractmethod + def add_data_callback(self, session_id: str, callback: Callable) -> bool: + """ + Add callback for replay data. + + Args: + session_id: Session ID + callback: Function to call with replay data + Signature: callback(data: Union[OrderBookSnapshot, TradeEvent]) + + Returns: + bool: True if callback added successfully, False otherwise + """ + pass + + @abstractmethod + def remove_data_callback(self, session_id: str, callback: Callable) -> bool: + """ + Remove data callback from replay session. + + Args: + session_id: Session ID + callback: Callback function to remove + + Returns: + bool: True if callback removed successfully, False otherwise + """ + pass + + @abstractmethod + def add_status_callback(self, session_id: str, callback: Callable) -> bool: + """ + Add callback for replay status changes. + + Args: + session_id: Session ID + callback: Function to call on status change + Signature: callback(session_id: str, status: ReplayStatus) + + Returns: + bool: True if callback added successfully, False otherwise + """ + pass + + @abstractmethod + async def get_available_data_range(self, symbol: str, + exchange: Optional[str] = None) -> Optional[Dict[str, datetime]]: + """ + Get available data time range for replay. + + Args: + symbol: Trading symbol + exchange: Exchange name (None = all exchanges) + + Returns: + Dict: {'start': datetime, 'end': datetime} or None if no data + """ + pass + + @abstractmethod + def validate_replay_request(self, start_time: datetime, end_time: datetime, + symbols: Optional[List[str]] = None, + exchanges: Optional[List[str]] = None) -> List[str]: + """ + Validate replay request parameters. + + Args: + start_time: Requested start time + end_time: Requested end time + symbols: Requested symbols + exchanges: Requested exchanges + + Returns: + List[str]: List of validation errors (empty if valid) + """ + pass \ No newline at end of file diff --git a/COBY/interfaces/storage_manager.py b/COBY/interfaces/storage_manager.py new file mode 100644 index 0000000..63ec849 --- /dev/null +++ b/COBY/interfaces/storage_manager.py @@ -0,0 +1,215 @@ +""" +Interface for data storage and retrieval. +""" + +from abc import ABC, abstractmethod +from datetime import datetime +from typing import List, Dict, Optional, Any +from ..models.core import OrderBookSnapshot, TradeEvent, HeatmapData, SystemMetrics + + +class StorageManager(ABC): + """Manages data persistence and retrieval""" + + @abstractmethod + async def store_orderbook(self, data: OrderBookSnapshot) -> bool: + """ + Store order book snapshot to database. + + Args: + data: Order book snapshot to store + + Returns: + bool: True if stored successfully, False otherwise + """ + pass + + @abstractmethod + async def store_trade(self, data: TradeEvent) -> bool: + """ + Store trade event to database. + + Args: + data: Trade event to store + + Returns: + bool: True if stored successfully, False otherwise + """ + pass + + @abstractmethod + async def store_heatmap(self, data: HeatmapData) -> bool: + """ + Store heatmap data to database. + + Args: + data: Heatmap data to store + + Returns: + bool: True if stored successfully, False otherwise + """ + pass + + @abstractmethod + async def store_metrics(self, data: SystemMetrics) -> bool: + """ + Store system metrics to database. + + Args: + data: System metrics to store + + Returns: + bool: True if stored successfully, False otherwise + """ + pass + + @abstractmethod + async def get_historical_orderbooks(self, symbol: str, exchange: str, + start: datetime, end: datetime, + limit: Optional[int] = None) -> List[OrderBookSnapshot]: + """ + Retrieve historical order book data. + + Args: + symbol: Trading symbol + exchange: Exchange name + start: Start timestamp + end: End timestamp + limit: Maximum number of records to return + + Returns: + List[OrderBookSnapshot]: Historical order book data + """ + pass + + @abstractmethod + async def get_historical_trades(self, symbol: str, exchange: str, + start: datetime, end: datetime, + limit: Optional[int] = None) -> List[TradeEvent]: + """ + Retrieve historical trade data. + + Args: + symbol: Trading symbol + exchange: Exchange name + start: Start timestamp + end: End timestamp + limit: Maximum number of records to return + + Returns: + List[TradeEvent]: Historical trade data + """ + pass + + @abstractmethod + async def get_latest_orderbook(self, symbol: str, exchange: str) -> Optional[OrderBookSnapshot]: + """ + Get latest order book snapshot. + + Args: + symbol: Trading symbol + exchange: Exchange name + + Returns: + OrderBookSnapshot: Latest order book or None if not found + """ + pass + + @abstractmethod + async def get_latest_heatmap(self, symbol: str, bucket_size: float) -> Optional[HeatmapData]: + """ + Get latest heatmap data. + + Args: + symbol: Trading symbol + bucket_size: Price bucket size + + Returns: + HeatmapData: Latest heatmap or None if not found + """ + pass + + @abstractmethod + async def get_ohlcv_data(self, symbol: str, exchange: str, timeframe: str, + start: datetime, end: datetime) -> List[Dict[str, Any]]: + """ + Get OHLCV candlestick data. + + Args: + symbol: Trading symbol + exchange: Exchange name + timeframe: Timeframe (e.g., '1m', '5m', '1h') + start: Start timestamp + end: End timestamp + + Returns: + List[Dict]: OHLCV data + """ + pass + + @abstractmethod + async def batch_store_orderbooks(self, data: List[OrderBookSnapshot]) -> int: + """ + Store multiple order book snapshots in batch. + + Args: + data: List of order book snapshots + + Returns: + int: Number of records stored successfully + """ + pass + + @abstractmethod + async def batch_store_trades(self, data: List[TradeEvent]) -> int: + """ + Store multiple trade events in batch. + + Args: + data: List of trade events + + Returns: + int: Number of records stored successfully + """ + pass + + @abstractmethod + def setup_database_schema(self) -> None: + """ + Set up database schema and tables. + Should be idempotent - safe to call multiple times. + """ + pass + + @abstractmethod + async def cleanup_old_data(self, retention_days: int) -> int: + """ + Clean up old data based on retention policy. + + Args: + retention_days: Number of days to retain data + + Returns: + int: Number of records deleted + """ + pass + + @abstractmethod + async def get_storage_stats(self) -> Dict[str, Any]: + """ + Get storage statistics. + + Returns: + Dict: Storage statistics (table sizes, record counts, etc.) + """ + pass + + @abstractmethod + async def health_check(self) -> bool: + """ + Check storage system health. + + Returns: + bool: True if healthy, False otherwise + """ + pass \ No newline at end of file diff --git a/COBY/models/__init__.py b/COBY/models/__init__.py new file mode 100644 index 0000000..4d0126c --- /dev/null +++ b/COBY/models/__init__.py @@ -0,0 +1,31 @@ +""" +Data models for the multi-exchange data aggregation system. +""" + +from .core import ( + OrderBookSnapshot, + PriceLevel, + TradeEvent, + PriceBuckets, + HeatmapData, + HeatmapPoint, + ConnectionStatus, + OrderBookMetrics, + ImbalanceMetrics, + ConsolidatedOrderBook, + ReplayStatus +) + +__all__ = [ + 'OrderBookSnapshot', + 'PriceLevel', + 'TradeEvent', + 'PriceBuckets', + 'HeatmapData', + 'HeatmapPoint', + 'ConnectionStatus', + 'OrderBookMetrics', + 'ImbalanceMetrics', + 'ConsolidatedOrderBook', + 'ReplayStatus' +] \ No newline at end of file diff --git a/COBY/models/core.py b/COBY/models/core.py new file mode 100644 index 0000000..17585f3 --- /dev/null +++ b/COBY/models/core.py @@ -0,0 +1,324 @@ +""" +Core data models for the multi-exchange data aggregation system. +""" + +from dataclasses import dataclass, field +from datetime import datetime +from typing import List, Dict, Optional, Any +from enum import Enum + + +class ConnectionStatus(Enum): + """Exchange connection status""" + DISCONNECTED = "disconnected" + CONNECTING = "connecting" + CONNECTED = "connected" + RECONNECTING = "reconnecting" + ERROR = "error" + + +class ReplayStatus(Enum): + """Replay session status""" + CREATED = "created" + RUNNING = "running" + PAUSED = "paused" + STOPPED = "stopped" + COMPLETED = "completed" + ERROR = "error" + + +@dataclass +class PriceLevel: + """Individual price level in order book""" + price: float + size: float + count: Optional[int] = None + + def __post_init__(self): + """Validate price level data""" + if self.price <= 0: + raise ValueError("Price must be positive") + if self.size < 0: + raise ValueError("Size cannot be negative") + + +@dataclass +class OrderBookSnapshot: + """Standardized order book snapshot""" + symbol: str + exchange: str + timestamp: datetime + bids: List[PriceLevel] + asks: List[PriceLevel] + sequence_id: Optional[int] = None + + def __post_init__(self): + """Validate and sort order book data""" + if not self.symbol: + raise ValueError("Symbol cannot be empty") + if not self.exchange: + raise ValueError("Exchange cannot be empty") + + # Sort bids descending (highest price first) + self.bids.sort(key=lambda x: x.price, reverse=True) + # Sort asks ascending (lowest price first) + self.asks.sort(key=lambda x: x.price) + + @property + def mid_price(self) -> Optional[float]: + """Calculate mid price""" + if self.bids and self.asks: + return (self.bids[0].price + self.asks[0].price) / 2 + return None + + @property + def spread(self) -> Optional[float]: + """Calculate bid-ask spread""" + if self.bids and self.asks: + return self.asks[0].price - self.bids[0].price + return None + + @property + def bid_volume(self) -> float: + """Total bid volume""" + return sum(level.size for level in self.bids) + + @property + def ask_volume(self) -> float: + """Total ask volume""" + return sum(level.size for level in self.asks) + + +@dataclass +class TradeEvent: + """Standardized trade event""" + symbol: str + exchange: str + timestamp: datetime + price: float + size: float + side: str # 'buy' or 'sell' + trade_id: str + + def __post_init__(self): + """Validate trade event data""" + if not self.symbol: + raise ValueError("Symbol cannot be empty") + if not self.exchange: + raise ValueError("Exchange cannot be empty") + if self.price <= 0: + raise ValueError("Price must be positive") + if self.size <= 0: + raise ValueError("Size must be positive") + if self.side not in ['buy', 'sell']: + raise ValueError("Side must be 'buy' or 'sell'") + if not self.trade_id: + raise ValueError("Trade ID cannot be empty") + + +@dataclass +class PriceBuckets: + """Aggregated price buckets for heatmap""" + symbol: str + timestamp: datetime + bucket_size: float + bid_buckets: Dict[float, float] = field(default_factory=dict) # price -> volume + ask_buckets: Dict[float, float] = field(default_factory=dict) # price -> volume + + def __post_init__(self): + """Validate price buckets""" + if self.bucket_size <= 0: + raise ValueError("Bucket size must be positive") + + def get_bucket_price(self, price: float) -> float: + """Get bucket price for a given price""" + return round(price / self.bucket_size) * self.bucket_size + + def add_bid(self, price: float, volume: float): + """Add bid volume to appropriate bucket""" + bucket_price = self.get_bucket_price(price) + self.bid_buckets[bucket_price] = self.bid_buckets.get(bucket_price, 0) + volume + + def add_ask(self, price: float, volume: float): + """Add ask volume to appropriate bucket""" + bucket_price = self.get_bucket_price(price) + self.ask_buckets[bucket_price] = self.ask_buckets.get(bucket_price, 0) + volume + + +@dataclass +class HeatmapPoint: + """Individual heatmap data point""" + price: float + volume: float + intensity: float # 0.0 to 1.0 + side: str # 'bid' or 'ask' + + def __post_init__(self): + """Validate heatmap point""" + if self.price <= 0: + raise ValueError("Price must be positive") + if self.volume < 0: + raise ValueError("Volume cannot be negative") + if not 0 <= self.intensity <= 1: + raise ValueError("Intensity must be between 0 and 1") + if self.side not in ['bid', 'ask']: + raise ValueError("Side must be 'bid' or 'ask'") + + +@dataclass +class HeatmapData: + """Heatmap visualization data""" + symbol: str + timestamp: datetime + bucket_size: float + data: List[HeatmapPoint] = field(default_factory=list) + + def __post_init__(self): + """Validate heatmap data""" + if self.bucket_size <= 0: + raise ValueError("Bucket size must be positive") + + def add_point(self, price: float, volume: float, side: str, max_volume: float = None): + """Add a heatmap point with calculated intensity""" + if max_volume is None: + max_volume = max((point.volume for point in self.data), default=volume) + + intensity = min(volume / max_volume, 1.0) if max_volume > 0 else 0.0 + point = HeatmapPoint(price=price, volume=volume, intensity=intensity, side=side) + self.data.append(point) + + def get_bids(self) -> List[HeatmapPoint]: + """Get bid points sorted by price descending""" + bids = [point for point in self.data if point.side == 'bid'] + return sorted(bids, key=lambda x: x.price, reverse=True) + + def get_asks(self) -> List[HeatmapPoint]: + """Get ask points sorted by price ascending""" + asks = [point for point in self.data if point.side == 'ask'] + return sorted(asks, key=lambda x: x.price) + + +@dataclass +class OrderBookMetrics: + """Order book analysis metrics""" + symbol: str + exchange: str + timestamp: datetime + mid_price: float + spread: float + spread_percentage: float + bid_volume: float + ask_volume: float + volume_imbalance: float # (bid_volume - ask_volume) / (bid_volume + ask_volume) + depth_10: float # Volume within 10 price levels + depth_50: float # Volume within 50 price levels + + def __post_init__(self): + """Validate metrics""" + if self.mid_price <= 0: + raise ValueError("Mid price must be positive") + if self.spread < 0: + raise ValueError("Spread cannot be negative") + + +@dataclass +class ImbalanceMetrics: + """Order book imbalance metrics""" + symbol: str + timestamp: datetime + volume_imbalance: float + price_imbalance: float + depth_imbalance: float + momentum_score: float # Derived from recent imbalance changes + + def __post_init__(self): + """Validate imbalance metrics""" + if not -1 <= self.volume_imbalance <= 1: + raise ValueError("Volume imbalance must be between -1 and 1") + + +@dataclass +class ConsolidatedOrderBook: + """Consolidated order book from multiple exchanges""" + symbol: str + timestamp: datetime + exchanges: List[str] + bids: List[PriceLevel] + asks: List[PriceLevel] + weighted_mid_price: float + total_bid_volume: float + total_ask_volume: float + exchange_weights: Dict[str, float] = field(default_factory=dict) + + def __post_init__(self): + """Validate consolidated order book""" + if not self.exchanges: + raise ValueError("At least one exchange must be specified") + if self.weighted_mid_price <= 0: + raise ValueError("Weighted mid price must be positive") + + +@dataclass +class ExchangeStatus: + """Exchange connection and health status""" + exchange: str + status: ConnectionStatus + last_message_time: Optional[datetime] = None + error_message: Optional[str] = None + connection_count: int = 0 + uptime_percentage: float = 0.0 + message_rate: float = 0.0 # Messages per second + + def __post_init__(self): + """Validate exchange status""" + if not self.exchange: + raise ValueError("Exchange name cannot be empty") + if not 0 <= self.uptime_percentage <= 100: + raise ValueError("Uptime percentage must be between 0 and 100") + + +@dataclass +class SystemMetrics: + """System performance metrics""" + timestamp: datetime + cpu_usage: float + memory_usage: float + disk_usage: float + network_io: Dict[str, float] = field(default_factory=dict) + database_connections: int = 0 + redis_connections: int = 0 + active_websockets: int = 0 + messages_per_second: float = 0.0 + processing_latency: float = 0.0 # Milliseconds + + def __post_init__(self): + """Validate system metrics""" + if not 0 <= self.cpu_usage <= 100: + raise ValueError("CPU usage must be between 0 and 100") + if not 0 <= self.memory_usage <= 100: + raise ValueError("Memory usage must be between 0 and 100") + + +@dataclass +class ReplaySession: + """Historical data replay session""" + session_id: str + start_time: datetime + end_time: datetime + speed: float # Playback speed multiplier + status: ReplayStatus + current_time: Optional[datetime] = None + progress: float = 0.0 # 0.0 to 1.0 + symbols: List[str] = field(default_factory=list) + exchanges: List[str] = field(default_factory=list) + + def __post_init__(self): + """Validate replay session""" + if not self.session_id: + raise ValueError("Session ID cannot be empty") + if self.start_time >= self.end_time: + raise ValueError("Start time must be before end time") + if self.speed <= 0: + raise ValueError("Speed must be positive") + if not 0 <= self.progress <= 1: + raise ValueError("Progress must be between 0 and 1") \ No newline at end of file diff --git a/COBY/utils/__init__.py b/COBY/utils/__init__.py new file mode 100644 index 0000000..17c2bca --- /dev/null +++ b/COBY/utils/__init__.py @@ -0,0 +1,22 @@ +""" +Utility functions and helpers for the multi-exchange data aggregation system. +""" + +from .logging import setup_logging, get_logger +from .validation import validate_symbol, validate_price, validate_volume +from .timing import get_current_timestamp, format_timestamp +from .exceptions import COBYException, ConnectionError, ValidationError, ProcessingError + +__all__ = [ + 'setup_logging', + 'get_logger', + 'validate_symbol', + 'validate_price', + 'validate_volume', + 'get_current_timestamp', + 'format_timestamp', + 'COBYException', + 'ConnectionError', + 'ValidationError', + 'ProcessingError' +] \ No newline at end of file diff --git a/COBY/utils/exceptions.py b/COBY/utils/exceptions.py new file mode 100644 index 0000000..43ad1a4 --- /dev/null +++ b/COBY/utils/exceptions.py @@ -0,0 +1,57 @@ +""" +Custom exceptions for the COBY system. +""" + + +class COBYException(Exception): + """Base exception for COBY system""" + + def __init__(self, message: str, error_code: str = None, details: dict = None): + super().__init__(message) + self.message = message + self.error_code = error_code + self.details = details or {} + + def to_dict(self) -> dict: + """Convert exception to dictionary""" + return { + 'error': self.__class__.__name__, + 'message': self.message, + 'error_code': self.error_code, + 'details': self.details + } + + +class ConnectionError(COBYException): + """Exception raised for connection-related errors""" + pass + + +class ValidationError(COBYException): + """Exception raised for data validation errors""" + pass + + +class ProcessingError(COBYException): + """Exception raised for data processing errors""" + pass + + +class StorageError(COBYException): + """Exception raised for storage-related errors""" + pass + + +class ConfigurationError(COBYException): + """Exception raised for configuration errors""" + pass + + +class ReplayError(COBYException): + """Exception raised for replay-related errors""" + pass + + +class AggregationError(COBYException): + """Exception raised for aggregation errors""" + pass \ No newline at end of file diff --git a/COBY/utils/logging.py b/COBY/utils/logging.py new file mode 100644 index 0000000..138af3c --- /dev/null +++ b/COBY/utils/logging.py @@ -0,0 +1,149 @@ +""" +Logging utilities for the COBY system. +""" + +import logging +import logging.handlers +import sys +import uuid +from pathlib import Path +from typing import Optional +from contextvars import ContextVar + +# Context variable for correlation ID +correlation_id: ContextVar[Optional[str]] = ContextVar('correlation_id', default=None) + + +class CorrelationFilter(logging.Filter): + """Add correlation ID to log records""" + + def filter(self, record): + record.correlation_id = correlation_id.get() or 'N/A' + return True + + +class COBYFormatter(logging.Formatter): + """Custom formatter with correlation ID support""" + + def __init__(self, include_correlation_id: bool = True): + self.include_correlation_id = include_correlation_id + + if include_correlation_id: + fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(correlation_id)s] - %(message)s' + else: + fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + super().__init__(fmt, datefmt='%Y-%m-%d %H:%M:%S') + + +def setup_logging( + level: str = 'INFO', + log_file: Optional[str] = None, + max_file_size: int = 100, # MB + backup_count: int = 5, + enable_correlation_id: bool = True, + console_output: bool = True +) -> None: + """ + Set up logging configuration for the COBY system. + + Args: + level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + log_file: Path to log file (None = no file logging) + max_file_size: Maximum log file size in MB + backup_count: Number of backup files to keep + enable_correlation_id: Whether to include correlation IDs in logs + console_output: Whether to output logs to console + """ + # Convert string level to logging constant + numeric_level = getattr(logging, level.upper(), logging.INFO) + + # Create root logger + root_logger = logging.getLogger() + root_logger.setLevel(numeric_level) + + # Clear existing handlers + root_logger.handlers.clear() + + # Create formatter + formatter = COBYFormatter(include_correlation_id=enable_correlation_id) + + # Add correlation filter if enabled + correlation_filter = CorrelationFilter() if enable_correlation_id else None + + # Console handler + if console_output: + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(numeric_level) + console_handler.setFormatter(formatter) + if correlation_filter: + console_handler.addFilter(correlation_filter) + root_logger.addHandler(console_handler) + + # File handler + if log_file: + # Create log directory if it doesn't exist + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + # Rotating file handler + file_handler = logging.handlers.RotatingFileHandler( + log_file, + maxBytes=max_file_size * 1024 * 1024, # Convert MB to bytes + backupCount=backup_count + ) + file_handler.setLevel(numeric_level) + file_handler.setFormatter(formatter) + if correlation_filter: + file_handler.addFilter(correlation_filter) + root_logger.addHandler(file_handler) + + # Set specific logger levels + logging.getLogger('websockets').setLevel(logging.WARNING) + logging.getLogger('urllib3').setLevel(logging.WARNING) + logging.getLogger('requests').setLevel(logging.WARNING) + + +def get_logger(name: str) -> logging.Logger: + """ + Get a logger instance with the specified name. + + Args: + name: Logger name (typically __name__) + + Returns: + logging.Logger: Logger instance + """ + return logging.getLogger(name) + + +def set_correlation_id(corr_id: Optional[str] = None) -> str: + """ + Set correlation ID for current context. + + Args: + corr_id: Correlation ID (generates UUID if None) + + Returns: + str: The correlation ID that was set + """ + if corr_id is None: + corr_id = str(uuid.uuid4())[:8] # Short UUID + + correlation_id.set(corr_id) + return corr_id + + +def get_correlation_id() -> Optional[str]: + """ + Get current correlation ID. + + Returns: + str: Current correlation ID or None + """ + return correlation_id.get() + + +def clear_correlation_id() -> None: + """Clear correlation ID from current context.""" + correlation_id.set(None) \ No newline at end of file diff --git a/COBY/utils/timing.py b/COBY/utils/timing.py new file mode 100644 index 0000000..5e752f9 --- /dev/null +++ b/COBY/utils/timing.py @@ -0,0 +1,206 @@ +""" +Timing utilities for the COBY system. +""" + +import time +from datetime import datetime, timezone +from typing import Optional + + +def get_current_timestamp() -> datetime: + """ + Get current UTC timestamp. + + Returns: + datetime: Current UTC timestamp + """ + return datetime.now(timezone.utc) + + +def format_timestamp(timestamp: datetime, format_str: str = "%Y-%m-%d %H:%M:%S.%f") -> str: + """ + Format timestamp to string. + + Args: + timestamp: Timestamp to format + format_str: Format string + + Returns: + str: Formatted timestamp string + """ + return timestamp.strftime(format_str) + + +def parse_timestamp(timestamp_str: str, format_str: str = "%Y-%m-%d %H:%M:%S.%f") -> datetime: + """ + Parse timestamp string to datetime. + + Args: + timestamp_str: Timestamp string to parse + format_str: Format string + + Returns: + datetime: Parsed timestamp + """ + dt = datetime.strptime(timestamp_str, format_str) + # Ensure timezone awareness + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +def timestamp_to_unix(timestamp: datetime) -> float: + """ + Convert datetime to Unix timestamp. + + Args: + timestamp: Datetime to convert + + Returns: + float: Unix timestamp + """ + return timestamp.timestamp() + + +def unix_to_timestamp(unix_time: float) -> datetime: + """ + Convert Unix timestamp to datetime. + + Args: + unix_time: Unix timestamp + + Returns: + datetime: Converted datetime (UTC) + """ + return datetime.fromtimestamp(unix_time, tz=timezone.utc) + + +def calculate_time_diff(start: datetime, end: datetime) -> float: + """ + Calculate time difference in seconds. + + Args: + start: Start timestamp + end: End timestamp + + Returns: + float: Time difference in seconds + """ + return (end - start).total_seconds() + + +def is_timestamp_recent(timestamp: datetime, max_age_seconds: int = 60) -> bool: + """ + Check if timestamp is recent (within max_age_seconds). + + Args: + timestamp: Timestamp to check + max_age_seconds: Maximum age in seconds + + Returns: + bool: True if recent, False otherwise + """ + now = get_current_timestamp() + age = calculate_time_diff(timestamp, now) + return age <= max_age_seconds + + +def sleep_until(target_time: datetime) -> None: + """ + Sleep until target time. + + Args: + target_time: Target timestamp to sleep until + """ + now = get_current_timestamp() + sleep_seconds = calculate_time_diff(now, target_time) + + if sleep_seconds > 0: + time.sleep(sleep_seconds) + + +def get_milliseconds() -> int: + """ + Get current timestamp in milliseconds. + + Returns: + int: Current timestamp in milliseconds + """ + return int(time.time() * 1000) + + +def milliseconds_to_timestamp(ms: int) -> datetime: + """ + Convert milliseconds to datetime. + + Args: + ms: Milliseconds timestamp + + Returns: + datetime: Converted datetime (UTC) + """ + return datetime.fromtimestamp(ms / 1000.0, tz=timezone.utc) + + +def round_timestamp(timestamp: datetime, seconds: int) -> datetime: + """ + Round timestamp to nearest interval. + + Args: + timestamp: Timestamp to round + seconds: Interval in seconds + + Returns: + datetime: Rounded timestamp + """ + unix_time = timestamp_to_unix(timestamp) + rounded_unix = round(unix_time / seconds) * seconds + return unix_to_timestamp(rounded_unix) + + +class Timer: + """Simple timer for measuring execution time""" + + def __init__(self): + self.start_time: Optional[float] = None + self.end_time: Optional[float] = None + + def start(self) -> None: + """Start the timer""" + self.start_time = time.perf_counter() + self.end_time = None + + def stop(self) -> float: + """ + Stop the timer and return elapsed time. + + Returns: + float: Elapsed time in seconds + """ + if self.start_time is None: + raise ValueError("Timer not started") + + self.end_time = time.perf_counter() + return self.elapsed() + + def elapsed(self) -> float: + """ + Get elapsed time. + + Returns: + float: Elapsed time in seconds + """ + if self.start_time is None: + return 0.0 + + end = self.end_time or time.perf_counter() + return end - self.start_time + + def __enter__(self): + """Context manager entry""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.stop() \ No newline at end of file diff --git a/COBY/utils/validation.py b/COBY/utils/validation.py new file mode 100644 index 0000000..6c398c2 --- /dev/null +++ b/COBY/utils/validation.py @@ -0,0 +1,217 @@ +""" +Data validation utilities for the COBY system. +""" + +import re +from typing import List, Optional +from decimal import Decimal, InvalidOperation + + +def validate_symbol(symbol: str) -> bool: + """ + Validate trading symbol format. + + Args: + symbol: Trading symbol to validate + + Returns: + bool: True if valid, False otherwise + """ + if not symbol or not isinstance(symbol, str): + return False + + # Basic symbol format validation (e.g., BTCUSDT, ETH-USD) + pattern = r'^[A-Z0-9]{2,10}[-/]?[A-Z0-9]{2,10}$' + return bool(re.match(pattern, symbol.upper())) + + +def validate_price(price: float) -> bool: + """ + Validate price value. + + Args: + price: Price to validate + + Returns: + bool: True if valid, False otherwise + """ + if not isinstance(price, (int, float, Decimal)): + return False + + try: + price_decimal = Decimal(str(price)) + return price_decimal > 0 and price_decimal < Decimal('1e10') # Reasonable upper bound + except (InvalidOperation, ValueError): + return False + + +def validate_volume(volume: float) -> bool: + """ + Validate volume value. + + Args: + volume: Volume to validate + + Returns: + bool: True if valid, False otherwise + """ + if not isinstance(volume, (int, float, Decimal)): + return False + + try: + volume_decimal = Decimal(str(volume)) + return volume_decimal >= 0 and volume_decimal < Decimal('1e15') # Reasonable upper bound + except (InvalidOperation, ValueError): + return False + + +def validate_exchange_name(exchange: str) -> bool: + """ + Validate exchange name. + + Args: + exchange: Exchange name to validate + + Returns: + bool: True if valid, False otherwise + """ + if not exchange or not isinstance(exchange, str): + return False + + # Exchange name should be alphanumeric with possible underscores/hyphens + pattern = r'^[a-zA-Z0-9_-]{2,20}$' + return bool(re.match(pattern, exchange)) + + +def validate_timestamp_range(start_time, end_time) -> List[str]: + """ + Validate timestamp range. + + Args: + start_time: Start timestamp + end_time: End timestamp + + Returns: + List[str]: List of validation errors (empty if valid) + """ + errors = [] + + if start_time is None: + errors.append("Start time cannot be None") + + if end_time is None: + errors.append("End time cannot be None") + + if start_time and end_time and start_time >= end_time: + errors.append("Start time must be before end time") + + return errors + + +def validate_bucket_size(bucket_size: float) -> bool: + """ + Validate price bucket size. + + Args: + bucket_size: Bucket size to validate + + Returns: + bool: True if valid, False otherwise + """ + if not isinstance(bucket_size, (int, float, Decimal)): + return False + + try: + size_decimal = Decimal(str(bucket_size)) + return size_decimal > 0 and size_decimal <= Decimal('1000') # Reasonable upper bound + except (InvalidOperation, ValueError): + return False + + +def validate_speed_multiplier(speed: float) -> bool: + """ + Validate replay speed multiplier. + + Args: + speed: Speed multiplier to validate + + Returns: + bool: True if valid, False otherwise + """ + if not isinstance(speed, (int, float)): + return False + + return 0.01 <= speed <= 100.0 # 1% to 100x speed + + +def sanitize_symbol(symbol: str) -> str: + """ + Sanitize and normalize symbol format. + + Args: + symbol: Symbol to sanitize + + Returns: + str: Sanitized symbol + """ + if not symbol: + return "" + + # Remove whitespace and convert to uppercase + sanitized = symbol.strip().upper() + + # Remove invalid characters + sanitized = re.sub(r'[^A-Z0-9/-]', '', sanitized) + + return sanitized + + +def validate_percentage(value: float, min_val: float = 0.0, max_val: float = 100.0) -> bool: + """ + Validate percentage value. + + Args: + value: Percentage value to validate + min_val: Minimum allowed value + max_val: Maximum allowed value + + Returns: + bool: True if valid, False otherwise + """ + if not isinstance(value, (int, float)): + return False + + return min_val <= value <= max_val + + +def validate_connection_config(config: dict) -> List[str]: + """ + Validate connection configuration. + + Args: + config: Configuration dictionary + + Returns: + List[str]: List of validation errors (empty if valid) + """ + errors = [] + + # Required fields + required_fields = ['host', 'port'] + for field in required_fields: + if field not in config: + errors.append(f"Missing required field: {field}") + + # Validate host + if 'host' in config: + host = config['host'] + if not isinstance(host, str) or not host.strip(): + errors.append("Host must be a non-empty string") + + # Validate port + if 'port' in config: + port = config['port'] + if not isinstance(port, int) or not (1 <= port <= 65535): + errors.append("Port must be an integer between 1 and 65535") + + return errors \ No newline at end of file diff --git a/web/clean_dashboard.py b/web/clean_dashboard.py index b387761..f45b1fc 100644 --- a/web/clean_dashboard.py +++ b/web/clean_dashboard.py @@ -9927,7 +9927,6 @@ def create_clean_dashboard(data_provider: Optional[DataProvider] = None, orchest def signal_handler(sig, frame): logger.info("Received shutdown signal") - self.shutdown() # Assuming a shutdown method exists or add one sys.exit(0) # Only set signal handlers if we're in the main thread