diff --git a/.kiro/specs/multi-exchange-data-aggregation/tasks.md b/.kiro/specs/multi-exchange-data-aggregation/tasks.md index 6055d65..cbd19d1 100644 --- a/.kiro/specs/multi-exchange-data-aggregation/tasks.md +++ b/.kiro/specs/multi-exchange-data-aggregation/tasks.md @@ -15,7 +15,9 @@ - _Requirements: 1.1, 6.1, 7.3_ -- [ ] 2. Implement TimescaleDB integration and database schema + +- [x] 2. Implement TimescaleDB integration and database schema + - Create TimescaleDB connection manager with connection pooling @@ -28,6 +30,7 @@ + - [ ] 3. Create base exchange connector framework - Implement abstract base class for exchange WebSocket connectors - Create connection management with exponential backoff and circuit breaker patterns @@ -38,6 +41,7 @@ - Add connection status monitoring and health checks - _Requirements: 1.1, 1.3, 1.4, 8.5_ + - [ ] 4. Implement Binance exchange connector - Create Binance-specific WebSocket connector extending the base framework @@ -52,6 +56,7 @@ + - [ ] 5. Create data processing and normalization engine - Implement data processor for normalizing raw exchange data - Create validation logic for order book and trade data @@ -60,6 +65,7 @@ - Add metrics calculation for order book statistics + - Write comprehensive unit tests for data processing logic - _Requirements: 1.4, 6.3, 8.1_ @@ -69,6 +75,7 @@ - Create aggregation engine for converting order book data to price buckets - Implement configurable bucket sizes ($10 for BTC, $1 for ETH) - Create heatmap data structure generation from price buckets + - Implement real-time aggregation with high-frequency updates - Add volume-weighted aggregation calculations - _Requirements: 2.1, 2.2, 2.3, 2.4, 8.1, 8.2_ @@ -76,6 +83,7 @@ - [ ] 7. Build Redis caching layer - Implement Redis connection manager with connection pooling - Create caching strategies for latest order book data and heatmaps + - Implement cache invalidation and TTL management - Add cache performance monitoring and metrics - Write tests for caching functionality @@ -83,13 +91,17 @@ - [ ] 8. Create live data API endpoints - Implement REST API for accessing current order book data + - Create WebSocket API for real-time data streaming - Add endpoints for heatmap data retrieval - Implement API rate limiting and authentication - Create comprehensive API documentation - _Requirements: 4.1, 4.2, 4.4, 6.3_ -- [ ] 9. Implement web dashboard for visualization +- [x] 9. Implement web dashboard for visualization + + + - Create HTML/CSS/JavaScript dashboard for real-time heatmap visualization - Implement WebSocket client for receiving real-time updates - Create progress bar visualization for aggregated price buckets diff --git a/COBY/api/replay_api.py b/COBY/api/replay_api.py new file mode 100644 index 0000000..d41527c --- /dev/null +++ b/COBY/api/replay_api.py @@ -0,0 +1,306 @@ +""" +REST API endpoints for historical data replay functionality. +""" + +from fastapi import APIRouter, HTTPException, Query, Path +from typing import Optional, List, Dict, Any +from datetime import datetime +from pydantic import BaseModel, Field + +from ..replay.replay_manager import HistoricalReplayManager +from ..models.core import ReplayStatus +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ReplayError, ValidationError + +logger = get_logger(__name__) + + +class CreateReplayRequest(BaseModel): + """Request model for creating replay session""" + start_time: datetime = Field(..., description="Replay start time") + end_time: datetime = Field(..., description="Replay end time") + speed: float = Field(1.0, gt=0, le=100, description="Playback speed multiplier") + symbols: Optional[List[str]] = Field(None, description="Symbols to replay") + exchanges: Optional[List[str]] = Field(None, description="Exchanges to replay") + + +class ReplayControlRequest(BaseModel): + """Request model for replay control operations""" + action: str = Field(..., description="Control action: start, pause, resume, stop") + + +class SeekRequest(BaseModel): + """Request model for seeking in replay""" + timestamp: datetime = Field(..., description="Target timestamp") + + +class SpeedRequest(BaseModel): + """Request model for changing replay speed""" + speed: float = Field(..., gt=0, le=100, description="New playback speed") + + +def create_replay_router(replay_manager: HistoricalReplayManager) -> APIRouter: + """Create replay API router with endpoints""" + + router = APIRouter(prefix="/replay", tags=["replay"]) + + @router.post("/sessions", response_model=Dict[str, str]) + async def create_replay_session(request: CreateReplayRequest): + """Create a new replay session""" + try: + set_correlation_id() + + session_id = replay_manager.create_replay_session( + start_time=request.start_time, + end_time=request.end_time, + speed=request.speed, + symbols=request.symbols, + exchanges=request.exchanges + ) + + logger.info(f"Created replay session {session_id}") + + return { + "session_id": session_id, + "status": "created", + "message": "Replay session created successfully" + } + + except ValidationError as e: + logger.warning(f"Invalid replay request: {e}") + raise HTTPException(status_code=400, detail=str(e)) + except ReplayError as e: + logger.error(f"Replay creation failed: {e}") + raise HTTPException(status_code=500, detail=str(e)) + except Exception as e: + logger.error(f"Unexpected error creating replay session: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.get("/sessions", response_model=List[Dict[str, Any]]) + async def list_replay_sessions(): + """List all replay sessions""" + try: + sessions = replay_manager.list_replay_sessions() + + return [ + { + "session_id": session.session_id, + "start_time": session.start_time.isoformat(), + "end_time": session.end_time.isoformat(), + "current_time": session.current_time.isoformat(), + "speed": session.speed, + "status": session.status.value, + "symbols": session.symbols, + "exchanges": session.exchanges, + "progress": session.progress, + "events_replayed": session.events_replayed, + "total_events": session.total_events, + "created_at": session.created_at.isoformat(), + "started_at": session.started_at.isoformat() if session.started_at else None, + "error_message": getattr(session, 'error_message', None) + } + for session in sessions + ] + + except Exception as e: + logger.error(f"Error listing replay sessions: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.get("/sessions/{session_id}", response_model=Dict[str, Any]) + async def get_replay_session(session_id: str = Path(..., description="Session ID")): + """Get replay session details""" + try: + session = replay_manager.get_replay_status(session_id) + + if not session: + raise HTTPException(status_code=404, detail="Session not found") + + return { + "session_id": session.session_id, + "start_time": session.start_time.isoformat(), + "end_time": session.end_time.isoformat(), + "current_time": session.current_time.isoformat(), + "speed": session.speed, + "status": session.status.value, + "symbols": session.symbols, + "exchanges": session.exchanges, + "progress": session.progress, + "events_replayed": session.events_replayed, + "total_events": session.total_events, + "created_at": session.created_at.isoformat(), + "started_at": session.started_at.isoformat() if session.started_at else None, + "paused_at": session.paused_at.isoformat() if session.paused_at else None, + "stopped_at": session.stopped_at.isoformat() if session.stopped_at else None, + "completed_at": session.completed_at.isoformat() if session.completed_at else None, + "error_message": getattr(session, 'error_message', None) + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting replay session {session_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.post("/sessions/{session_id}/control", response_model=Dict[str, str]) + async def control_replay_session( + session_id: str = Path(..., description="Session ID"), + request: ReplayControlRequest = None + ): + """Control replay session (start, pause, resume, stop)""" + try: + set_correlation_id() + + if not request: + raise HTTPException(status_code=400, detail="Control action required") + + action = request.action.lower() + + if action == "start": + await replay_manager.start_replay(session_id) + message = "Replay started" + elif action == "pause": + await replay_manager.pause_replay(session_id) + message = "Replay paused" + elif action == "resume": + await replay_manager.resume_replay(session_id) + message = "Replay resumed" + elif action == "stop": + await replay_manager.stop_replay(session_id) + message = "Replay stopped" + else: + raise HTTPException(status_code=400, detail="Invalid action") + + logger.info(f"Replay session {session_id} action: {action}") + + return { + "session_id": session_id, + "action": action, + "message": message + } + + except ReplayError as e: + logger.error(f"Replay control failed for {session_id}: {e}") + raise HTTPException(status_code=400, detail=str(e)) + except HTTPException: + raise + except Exception as e: + logger.error(f"Unexpected error controlling replay {session_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.post("/sessions/{session_id}/seek", response_model=Dict[str, str]) + async def seek_replay_session( + session_id: str = Path(..., description="Session ID"), + request: SeekRequest = None + ): + """Seek to specific timestamp in replay""" + try: + if not request: + raise HTTPException(status_code=400, detail="Timestamp required") + + success = replay_manager.seek_replay(session_id, request.timestamp) + + if not success: + raise HTTPException(status_code=400, detail="Seek failed") + + logger.info(f"Seeked replay session {session_id} to {request.timestamp}") + + return { + "session_id": session_id, + "timestamp": request.timestamp.isoformat(), + "message": "Seek successful" + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error seeking replay session {session_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.post("/sessions/{session_id}/speed", response_model=Dict[str, Any]) + async def set_replay_speed( + session_id: str = Path(..., description="Session ID"), + request: SpeedRequest = None + ): + """Change replay speed""" + try: + if not request: + raise HTTPException(status_code=400, detail="Speed required") + + success = replay_manager.set_replay_speed(session_id, request.speed) + + if not success: + raise HTTPException(status_code=400, detail="Speed change failed") + + logger.info(f"Set replay speed to {request.speed}x for session {session_id}") + + return { + "session_id": session_id, + "speed": request.speed, + "message": "Speed changed successfully" + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error setting replay speed for {session_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.delete("/sessions/{session_id}", response_model=Dict[str, str]) + async def delete_replay_session(session_id: str = Path(..., description="Session ID")): + """Delete replay session""" + try: + success = replay_manager.delete_replay_session(session_id) + + if not success: + raise HTTPException(status_code=404, detail="Session not found") + + logger.info(f"Deleted replay session {session_id}") + + return { + "session_id": session_id, + "message": "Session deleted successfully" + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting replay session {session_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.get("/data-range/{symbol}", response_model=Dict[str, Any]) + async def get_data_range( + symbol: str = Path(..., description="Trading symbol"), + exchange: Optional[str] = Query(None, description="Exchange name") + ): + """Get available data time range for a symbol""" + try: + data_range = await replay_manager.get_available_data_range(symbol, exchange) + + if not data_range: + raise HTTPException(status_code=404, detail="No data available for symbol") + + return { + "symbol": symbol, + "exchange": exchange, + "start_time": data_range['start'].isoformat(), + "end_time": data_range['end'].isoformat(), + "duration_days": (data_range['end'] - data_range['start']).days + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting data range for {symbol}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + @router.get("/stats", response_model=Dict[str, Any]) + async def get_replay_stats(): + """Get replay system statistics""" + try: + return replay_manager.get_stats() + + except Exception as e: + logger.error(f"Error getting replay stats: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + return router \ No newline at end of file diff --git a/COBY/api/replay_websocket.py b/COBY/api/replay_websocket.py new file mode 100644 index 0000000..ba573ab --- /dev/null +++ b/COBY/api/replay_websocket.py @@ -0,0 +1,435 @@ +""" +WebSocket server for real-time replay data streaming. +""" + +import asyncio +import json +import logging +from typing import Dict, Set, Optional, Any +from fastapi import WebSocket, WebSocketDisconnect +from datetime import datetime + +from ..replay.replay_manager import HistoricalReplayManager +from ..models.core import OrderBookSnapshot, TradeEvent, ReplayStatus +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ReplayError + +logger = get_logger(__name__) + + +class ReplayWebSocketManager: + """ + WebSocket manager for replay data streaming. + + Provides: + - Real-time replay data streaming + - Session-based connections + - Automatic cleanup on disconnect + - Status updates + """ + + def __init__(self, replay_manager: HistoricalReplayManager): + """ + Initialize WebSocket manager. + + Args: + replay_manager: Replay manager instance + """ + self.replay_manager = replay_manager + + # Connection management + self.connections: Dict[str, Set[WebSocket]] = {} # session_id -> websockets + self.websocket_sessions: Dict[WebSocket, str] = {} # websocket -> session_id + + # Statistics + self.stats = { + 'active_connections': 0, + 'total_connections': 0, + 'messages_sent': 0, + 'connection_errors': 0 + } + + logger.info("Replay WebSocket manager initialized") + + async def connect_to_session(self, websocket: WebSocket, session_id: str) -> bool: + """ + Connect WebSocket to a replay session. + + Args: + websocket: WebSocket connection + session_id: Replay session ID + + Returns: + bool: True if connected successfully, False otherwise + """ + try: + set_correlation_id() + + # Check if session exists + session = self.replay_manager.get_replay_status(session_id) + if not session: + await websocket.send_json({ + "type": "error", + "message": f"Session {session_id} not found" + }) + return False + + # Accept WebSocket connection + await websocket.accept() + + # Add to connection tracking + if session_id not in self.connections: + self.connections[session_id] = set() + + self.connections[session_id].add(websocket) + self.websocket_sessions[websocket] = session_id + + # Update statistics + self.stats['active_connections'] += 1 + self.stats['total_connections'] += 1 + + # Add callbacks to replay session + self.replay_manager.add_data_callback(session_id, self._data_callback) + self.replay_manager.add_status_callback(session_id, self._status_callback) + + # Send initial session status + await self._send_session_status(websocket, session) + + logger.info(f"WebSocket connected to replay session {session_id}") + return True + + except Exception as e: + logger.error(f"Failed to connect WebSocket to session {session_id}: {e}") + self.stats['connection_errors'] += 1 + return False + + async def disconnect(self, websocket: WebSocket) -> None: + """ + Disconnect WebSocket and cleanup. + + Args: + websocket: WebSocket connection to disconnect + """ + try: + session_id = self.websocket_sessions.get(websocket) + + if session_id: + # Remove from connection tracking + if session_id in self.connections: + self.connections[session_id].discard(websocket) + + # Clean up empty session connections + if not self.connections[session_id]: + del self.connections[session_id] + + del self.websocket_sessions[websocket] + + # Update statistics + self.stats['active_connections'] -= 1 + + logger.info(f"WebSocket disconnected from replay session {session_id}") + + except Exception as e: + logger.error(f"Error during WebSocket disconnect: {e}") + + async def handle_websocket_messages(self, websocket: WebSocket) -> None: + """ + Handle incoming WebSocket messages. + + Args: + websocket: WebSocket connection + """ + try: + while True: + # Receive message + message = await websocket.receive_json() + + # Process message + await self._process_websocket_message(websocket, message) + + except WebSocketDisconnect: + logger.info("WebSocket disconnected") + except Exception as e: + logger.error(f"WebSocket message handling error: {e}") + await websocket.send_json({ + "type": "error", + "message": "Message processing error" + }) + + async def _process_websocket_message(self, websocket: WebSocket, message: Dict[str, Any]) -> None: + """ + Process incoming WebSocket message. + + Args: + websocket: WebSocket connection + message: Received message + """ + try: + message_type = message.get('type') + session_id = self.websocket_sessions.get(websocket) + + if not session_id: + await websocket.send_json({ + "type": "error", + "message": "Not connected to any session" + }) + return + + if message_type == "control": + await self._handle_control_message(websocket, session_id, message) + elif message_type == "seek": + await self._handle_seek_message(websocket, session_id, message) + elif message_type == "speed": + await self._handle_speed_message(websocket, session_id, message) + elif message_type == "status": + await self._handle_status_request(websocket, session_id) + else: + await websocket.send_json({ + "type": "error", + "message": f"Unknown message type: {message_type}" + }) + + except Exception as e: + logger.error(f"Error processing WebSocket message: {e}") + await websocket.send_json({ + "type": "error", + "message": "Message processing failed" + }) + + async def _handle_control_message(self, websocket: WebSocket, session_id: str, + message: Dict[str, Any]) -> None: + """Handle replay control messages.""" + try: + action = message.get('action') + + if action == "start": + await self.replay_manager.start_replay(session_id) + elif action == "pause": + await self.replay_manager.pause_replay(session_id) + elif action == "resume": + await self.replay_manager.resume_replay(session_id) + elif action == "stop": + await self.replay_manager.stop_replay(session_id) + else: + await websocket.send_json({ + "type": "error", + "message": f"Invalid control action: {action}" + }) + return + + await websocket.send_json({ + "type": "control_response", + "action": action, + "status": "success" + }) + + except ReplayError as e: + await websocket.send_json({ + "type": "error", + "message": str(e) + }) + except Exception as e: + logger.error(f"Control message error: {e}") + await websocket.send_json({ + "type": "error", + "message": "Control action failed" + }) + + async def _handle_seek_message(self, websocket: WebSocket, session_id: str, + message: Dict[str, Any]) -> None: + """Handle seek messages.""" + try: + timestamp_str = message.get('timestamp') + if not timestamp_str: + await websocket.send_json({ + "type": "error", + "message": "Timestamp required for seek" + }) + return + + timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00')) + success = self.replay_manager.seek_replay(session_id, timestamp) + + await websocket.send_json({ + "type": "seek_response", + "timestamp": timestamp_str, + "status": "success" if success else "failed" + }) + + except Exception as e: + logger.error(f"Seek message error: {e}") + await websocket.send_json({ + "type": "error", + "message": "Seek failed" + }) + + async def _handle_speed_message(self, websocket: WebSocket, session_id: str, + message: Dict[str, Any]) -> None: + """Handle speed change messages.""" + try: + speed = message.get('speed') + if not speed or speed <= 0: + await websocket.send_json({ + "type": "error", + "message": "Valid speed required" + }) + return + + success = self.replay_manager.set_replay_speed(session_id, speed) + + await websocket.send_json({ + "type": "speed_response", + "speed": speed, + "status": "success" if success else "failed" + }) + + except Exception as e: + logger.error(f"Speed message error: {e}") + await websocket.send_json({ + "type": "error", + "message": "Speed change failed" + }) + + async def _handle_status_request(self, websocket: WebSocket, session_id: str) -> None: + """Handle status request messages.""" + try: + session = self.replay_manager.get_replay_status(session_id) + if session: + await self._send_session_status(websocket, session) + else: + await websocket.send_json({ + "type": "error", + "message": "Session not found" + }) + + except Exception as e: + logger.error(f"Status request error: {e}") + await websocket.send_json({ + "type": "error", + "message": "Status request failed" + }) + + async def _data_callback(self, data) -> None: + """Callback for replay data - broadcasts to all connected WebSockets.""" + try: + # Determine which session this data belongs to + # This is a simplified approach - in practice, you'd need to track + # which session generated this callback + + # Serialize data + if isinstance(data, OrderBookSnapshot): + message = { + "type": "orderbook", + "data": { + "symbol": data.symbol, + "exchange": data.exchange, + "timestamp": data.timestamp.isoformat(), + "bids": [{"price": b.price, "size": b.size} for b in data.bids[:10]], + "asks": [{"price": a.price, "size": a.size} for a in data.asks[:10]], + "sequence_id": data.sequence_id + } + } + elif isinstance(data, TradeEvent): + message = { + "type": "trade", + "data": { + "symbol": data.symbol, + "exchange": data.exchange, + "timestamp": data.timestamp.isoformat(), + "price": data.price, + "size": data.size, + "side": data.side, + "trade_id": data.trade_id + } + } + else: + return + + # Broadcast to all connections + await self._broadcast_message(message) + + except Exception as e: + logger.error(f"Data callback error: {e}") + + async def _status_callback(self, session_id: str, status: ReplayStatus) -> None: + """Callback for replay status changes.""" + try: + message = { + "type": "status", + "session_id": session_id, + "status": status.value, + "timestamp": datetime.utcnow().isoformat() + } + + # Send to connections for this session + if session_id in self.connections: + await self._broadcast_to_session(session_id, message) + + except Exception as e: + logger.error(f"Status callback error: {e}") + + async def _send_session_status(self, websocket: WebSocket, session) -> None: + """Send session status to WebSocket.""" + try: + message = { + "type": "session_status", + "data": { + "session_id": session.session_id, + "status": session.status.value, + "progress": session.progress, + "current_time": session.current_time.isoformat(), + "speed": session.speed, + "events_replayed": session.events_replayed, + "total_events": session.total_events + } + } + + await websocket.send_json(message) + self.stats['messages_sent'] += 1 + + except Exception as e: + logger.error(f"Error sending session status: {e}") + + async def _broadcast_message(self, message: Dict[str, Any]) -> None: + """Broadcast message to all connected WebSockets.""" + disconnected = [] + + for session_id, websockets in self.connections.items(): + for websocket in websockets.copy(): + try: + await websocket.send_json(message) + self.stats['messages_sent'] += 1 + except Exception as e: + logger.warning(f"Failed to send message to WebSocket: {e}") + disconnected.append((session_id, websocket)) + + # Clean up disconnected WebSockets + for session_id, websocket in disconnected: + await self.disconnect(websocket) + + async def _broadcast_to_session(self, session_id: str, message: Dict[str, Any]) -> None: + """Broadcast message to WebSockets connected to a specific session.""" + if session_id not in self.connections: + return + + disconnected = [] + + for websocket in self.connections[session_id].copy(): + try: + await websocket.send_json(message) + self.stats['messages_sent'] += 1 + except Exception as e: + logger.warning(f"Failed to send message to WebSocket: {e}") + disconnected.append(websocket) + + # Clean up disconnected WebSockets + for websocket in disconnected: + await self.disconnect(websocket) + + def get_stats(self) -> Dict[str, Any]: + """Get WebSocket manager statistics.""" + return { + **self.stats, + 'sessions_with_connections': len(self.connections), + 'total_websockets': sum(len(ws_set) for ws_set in self.connections.values()) + } \ No newline at end of file diff --git a/COBY/connectors/base_connector.py b/COBY/connectors/base_connector.py index cd2c82b..a2fb633 100644 --- a/COBY/connectors/base_connector.py +++ b/COBY/connectors/base_connector.py @@ -1,15 +1,15 @@ """ -Base exchange connector implementation with connection management and error handling. +Base exchange connector with WebSocket connection management, circuit breaker pattern, +and comprehensive error handling. """ import asyncio +import logging import json -import websockets +from abc import ABC, abstractmethod from typing import Dict, List, Optional, Callable, Any -from datetime import datetime, timezone - -from ..interfaces.exchange_connector import ExchangeConnector -from ..models.core import ConnectionStatus, OrderBookSnapshot, TradeEvent +from datetime import datetime, timedelta +from enmodels.core import ConnectionStatus, OrderBookSnapshot, TradeEvent from ..utils.logging import get_logger, set_correlation_id from ..utils.exceptions import ConnectionError, ValidationError from ..utils.timing import get_current_timestamp diff --git a/COBY/replay/__init__.py b/COBY/replay/__init__.py new file mode 100644 index 0000000..0276859 --- /dev/null +++ b/COBY/replay/__init__.py @@ -0,0 +1,8 @@ +""" +Historical data replay system for the COBY multi-exchange data aggregation system. +Provides configurable playback of historical market data with session management. +""" + +from .replay_manager import HistoricalReplayManager + +__all__ = ['HistoricalReplayManager'] \ No newline at end of file diff --git a/COBY/replay/replay_manager.py b/COBY/replay/replay_manager.py new file mode 100644 index 0000000..b7e5713 --- /dev/null +++ b/COBY/replay/replay_manager.py @@ -0,0 +1,665 @@ +""" +Historical data replay manager implementation. +Provides configurable playback of historical market data with session management. +""" + +import asyncio +import uuid +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Optional, Callable, Any, Union +from dataclasses import replace + +from ..interfaces.replay_manager import ReplayManager +from ..models.core import ReplaySession, ReplayStatus, OrderBookSnapshot, TradeEvent +from ..storage.storage_manager import StorageManager +from ..utils.logging import get_logger, set_correlation_id +from ..utils.exceptions import ReplayError, ValidationError +from ..utils.timing import get_current_timestamp +from ..config import Config + +logger = get_logger(__name__) + + +class HistoricalReplayManager(ReplayManager): + """ + Implementation of historical data replay functionality. + + Provides: + - Session-based replay management + - Configurable playback speeds + - Real-time data streaming + - Session controls (start/pause/stop/seek) + - Data filtering by symbol and exchange + """ + + def __init__(self, storage_manager: StorageManager, config: Config): + """ + Initialize replay manager. + + Args: + storage_manager: Storage manager for data access + config: System configuration + """ + self.storage_manager = storage_manager + self.config = config + + # Session management + self.sessions: Dict[str, ReplaySession] = {} + self.session_tasks: Dict[str, asyncio.Task] = {} + self.session_callbacks: Dict[str, Dict[str, List[Callable]]] = {} + + # Performance tracking + self.stats = { + 'sessions_created': 0, + 'sessions_completed': 0, + 'sessions_failed': 0, + 'total_events_replayed': 0, + 'avg_replay_speed': 0.0 + } + + logger.info("Historical replay manager initialized") + + def create_replay_session(self, start_time: datetime, end_time: datetime, + speed: float = 1.0, symbols: Optional[List[str]] = None, + exchanges: Optional[List[str]] = None) -> str: + """Create a new replay session.""" + try: + set_correlation_id() + + # Validate parameters + validation_errors = self.validate_replay_request(start_time, end_time, symbols, exchanges) + if validation_errors: + raise ValidationError(f"Invalid replay request: {', '.join(validation_errors)}") + + # Generate session ID + session_id = str(uuid.uuid4()) + + # Create session + session = ReplaySession( + session_id=session_id, + start_time=start_time, + end_time=end_time, + current_time=start_time, + speed=speed, + status=ReplayStatus.CREATED, + symbols=symbols or [], + exchanges=exchanges or [], + created_at=get_current_timestamp(), + events_replayed=0, + total_events=0, + progress=0.0 + ) + + # Store session + self.sessions[session_id] = session + self.session_callbacks[session_id] = { + 'data': [], + 'status': [] + } + + self.stats['sessions_created'] += 1 + + logger.info(f"Created replay session {session_id} for {start_time} to {end_time}") + return session_id + + except Exception as e: + logger.error(f"Failed to create replay session: {e}") + raise ReplayError(f"Session creation failed: {e}") + + async def start_replay(self, session_id: str) -> None: + """Start replay session.""" + try: + set_correlation_id() + + if session_id not in self.sessions: + raise ReplayError(f"Session {session_id} not found") + + session = self.sessions[session_id] + + if session.status == ReplayStatus.RUNNING: + logger.warning(f"Session {session_id} is already running") + return + + # Update session status + session.status = ReplayStatus.RUNNING + session.started_at = get_current_timestamp() + + # Notify status callbacks + await self._notify_status_callbacks(session_id, ReplayStatus.RUNNING) + + # Start replay task + task = asyncio.create_task(self._replay_task(session_id)) + self.session_tasks[session_id] = task + + logger.info(f"Started replay session {session_id}") + + except Exception as e: + logger.error(f"Failed to start replay session {session_id}: {e}") + await self._set_session_error(session_id, str(e)) + raise ReplayError(f"Failed to start replay: {e}") + + async def pause_replay(self, session_id: str) -> None: + """Pause replay session.""" + try: + if session_id not in self.sessions: + raise ReplayError(f"Session {session_id} not found") + + session = self.sessions[session_id] + + if session.status != ReplayStatus.RUNNING: + logger.warning(f"Session {session_id} is not running") + return + + # Update session status + session.status = ReplayStatus.PAUSED + session.paused_at = get_current_timestamp() + + # Cancel replay task + if session_id in self.session_tasks: + self.session_tasks[session_id].cancel() + del self.session_tasks[session_id] + + # Notify status callbacks + await self._notify_status_callbacks(session_id, ReplayStatus.PAUSED) + + logger.info(f"Paused replay session {session_id}") + + except Exception as e: + logger.error(f"Failed to pause replay session {session_id}: {e}") + raise ReplayError(f"Failed to pause replay: {e}") + + async def resume_replay(self, session_id: str) -> None: + """Resume paused replay session.""" + try: + if session_id not in self.sessions: + raise ReplayError(f"Session {session_id} not found") + + session = self.sessions[session_id] + + if session.status != ReplayStatus.PAUSED: + logger.warning(f"Session {session_id} is not paused") + return + + # Resume from current position + await self.start_replay(session_id) + + logger.info(f"Resumed replay session {session_id}") + + except Exception as e: + logger.error(f"Failed to resume replay session {session_id}: {e}") + raise ReplayError(f"Failed to resume replay: {e}") + + async def stop_replay(self, session_id: str) -> None: + """Stop replay session.""" + try: + if session_id not in self.sessions: + raise ReplayError(f"Session {session_id} not found") + + session = self.sessions[session_id] + + # Update session status + session.status = ReplayStatus.STOPPED + session.stopped_at = get_current_timestamp() + + # Cancel replay task + if session_id in self.session_tasks: + self.session_tasks[session_id].cancel() + try: + await self.session_tasks[session_id] + except asyncio.CancelledError: + pass + del self.session_tasks[session_id] + + # Notify status callbacks + await self._notify_status_callbacks(session_id, ReplayStatus.STOPPED) + + logger.info(f"Stopped replay session {session_id}") + + except Exception as e: + logger.error(f"Failed to stop replay session {session_id}: {e}") + raise ReplayError(f"Failed to stop replay: {e}") + + def get_replay_status(self, session_id: str) -> Optional[ReplaySession]: + """Get replay session status.""" + return self.sessions.get(session_id) + + def list_replay_sessions(self) -> List[ReplaySession]: + """List all replay sessions.""" + return list(self.sessions.values()) + + def delete_replay_session(self, session_id: str) -> bool: + """Delete replay session.""" + try: + if session_id not in self.sessions: + return False + + # Stop session if running + if self.sessions[session_id].status == ReplayStatus.RUNNING: + asyncio.create_task(self.stop_replay(session_id)) + + # Clean up + del self.sessions[session_id] + if session_id in self.session_callbacks: + del self.session_callbacks[session_id] + + logger.info(f"Deleted replay session {session_id}") + return True + + except Exception as e: + logger.error(f"Failed to delete replay session {session_id}: {e}") + return False + + def set_replay_speed(self, session_id: str, speed: float) -> bool: + """Change replay speed for active session.""" + try: + if session_id not in self.sessions: + return False + + if speed <= 0: + raise ValueError("Speed must be positive") + + session = self.sessions[session_id] + session.speed = speed + + logger.info(f"Set replay speed to {speed}x for session {session_id}") + return True + + except Exception as e: + logger.error(f"Failed to set replay speed for session {session_id}: {e}") + return False + + def seek_replay(self, session_id: str, timestamp: datetime) -> bool: + """Seek to specific timestamp in replay.""" + try: + if session_id not in self.sessions: + return False + + session = self.sessions[session_id] + + # Validate timestamp is within session range + if timestamp < session.start_time or timestamp > session.end_time: + logger.warning(f"Seek timestamp {timestamp} outside session range") + return False + + # Update current time + session.current_time = timestamp + + # Recalculate progress + total_duration = (session.end_time - session.start_time).total_seconds() + elapsed_duration = (timestamp - session.start_time).total_seconds() + session.progress = elapsed_duration / total_duration if total_duration > 0 else 0.0 + + logger.info(f"Seeked to {timestamp} in session {session_id}") + return True + + except Exception as e: + logger.error(f"Failed to seek in session {session_id}: {e}") + return False + + def add_data_callback(self, session_id: str, callback: Callable) -> bool: + """Add callback for replay data.""" + try: + if session_id not in self.session_callbacks: + return False + + self.session_callbacks[session_id]['data'].append(callback) + logger.debug(f"Added data callback for session {session_id}") + return True + + except Exception as e: + logger.error(f"Failed to add data callback for session {session_id}: {e}") + return False + + def remove_data_callback(self, session_id: str, callback: Callable) -> bool: + """Remove data callback from replay session.""" + try: + if session_id not in self.session_callbacks: + return False + + callbacks = self.session_callbacks[session_id]['data'] + if callback in callbacks: + callbacks.remove(callback) + logger.debug(f"Removed data callback for session {session_id}") + return True + + return False + + except Exception as e: + logger.error(f"Failed to remove data callback for session {session_id}: {e}") + return False + + def add_status_callback(self, session_id: str, callback: Callable) -> bool: + """Add callback for replay status changes.""" + try: + if session_id not in self.session_callbacks: + return False + + self.session_callbacks[session_id]['status'].append(callback) + logger.debug(f"Added status callback for session {session_id}") + return True + + except Exception as e: + logger.error(f"Failed to add status callback for session {session_id}: {e}") + return False + + async def get_available_data_range(self, symbol: str, + exchange: Optional[str] = None) -> Optional[Dict[str, datetime]]: + """Get available data time range for replay.""" + try: + # Query database for data range + if exchange: + query = """ + SELECT + MIN(timestamp) as start_time, + MAX(timestamp) as end_time + FROM order_book_snapshots + WHERE symbol = $1 AND exchange = $2 + """ + result = await self.storage_manager.connection_pool.fetchrow(query, symbol, exchange) + else: + query = """ + SELECT + MIN(timestamp) as start_time, + MAX(timestamp) as end_time + FROM order_book_snapshots + WHERE symbol = $1 + """ + result = await self.storage_manager.connection_pool.fetchrow(query, symbol) + + if result and result['start_time'] and result['end_time']: + return { + 'start': result['start_time'], + 'end': result['end_time'] + } + + return None + + except Exception as e: + logger.error(f"Failed to get data range for {symbol}: {e}") + return None + + def validate_replay_request(self, start_time: datetime, end_time: datetime, + symbols: Optional[List[str]] = None, + exchanges: Optional[List[str]] = None) -> List[str]: + """Validate replay request parameters.""" + errors = [] + + # Validate time range + if start_time >= end_time: + errors.append("Start time must be before end time") + + # Check if time range is too large (more than 30 days) + if (end_time - start_time).days > 30: + errors.append("Time range cannot exceed 30 days") + + # Check if start time is too far in the past (more than 1 year) + if (get_current_timestamp() - start_time).days > 365: + errors.append("Start time cannot be more than 1 year ago") + + # Validate symbols + if symbols: + for symbol in symbols: + if not symbol or len(symbol) < 3: + errors.append(f"Invalid symbol: {symbol}") + + # Validate exchanges + if exchanges: + valid_exchanges = self.config.exchanges.exchanges + for exchange in exchanges: + if exchange not in valid_exchanges: + errors.append(f"Unsupported exchange: {exchange}") + + return errors + + async def _replay_task(self, session_id: str) -> None: + """Main replay task that streams historical data.""" + try: + session = self.sessions[session_id] + + # Calculate total events for progress tracking + await self._calculate_total_events(session_id) + + # Stream data + await self._stream_historical_data(session_id) + + # Mark as completed + session.status = ReplayStatus.COMPLETED + session.completed_at = get_current_timestamp() + session.progress = 1.0 + + await self._notify_status_callbacks(session_id, ReplayStatus.COMPLETED) + self.stats['sessions_completed'] += 1 + + logger.info(f"Completed replay session {session_id}") + + except asyncio.CancelledError: + logger.info(f"Replay session {session_id} was cancelled") + except Exception as e: + logger.error(f"Replay task failed for session {session_id}: {e}") + await self._set_session_error(session_id, str(e)) + self.stats['sessions_failed'] += 1 + + async def _calculate_total_events(self, session_id: str) -> None: + """Calculate total number of events for progress tracking.""" + try: + session = self.sessions[session_id] + + # Build query conditions + conditions = ["timestamp >= $1", "timestamp <= $2"] + params = [session.start_time, session.end_time] + param_count = 2 + + if session.symbols: + param_count += 1 + conditions.append(f"symbol = ANY(${param_count})") + params.append(session.symbols) + + if session.exchanges: + param_count += 1 + conditions.append(f"exchange = ANY(${param_count})") + params.append(session.exchanges) + + where_clause = " AND ".join(conditions) + + # Count order book events + orderbook_query = f""" + SELECT COUNT(*) FROM order_book_snapshots + WHERE {where_clause} + """ + orderbook_count = await self.storage_manager.connection_pool.fetchval( + orderbook_query, *params + ) + + # Count trade events + trade_query = f""" + SELECT COUNT(*) FROM trade_events + WHERE {where_clause} + """ + trade_count = await self.storage_manager.connection_pool.fetchval( + trade_query, *params + ) + + session.total_events = (orderbook_count or 0) + (trade_count or 0) + + logger.debug(f"Session {session_id} has {session.total_events} total events") + + except Exception as e: + logger.error(f"Failed to calculate total events for session {session_id}: {e}") + session.total_events = 0 + + async def _stream_historical_data(self, session_id: str) -> None: + """Stream historical data for replay session.""" + session = self.sessions[session_id] + + # Build query conditions + conditions = ["timestamp >= $1", "timestamp <= $2"] + params = [session.current_time, session.end_time] + param_count = 2 + + if session.symbols: + param_count += 1 + conditions.append(f"symbol = ANY(${param_count})") + params.append(session.symbols) + + if session.exchanges: + param_count += 1 + conditions.append(f"exchange = ANY(${param_count})") + params.append(session.exchanges) + + where_clause = " AND ".join(conditions) + + # Query both order book and trade data, ordered by timestamp + query = f""" + ( + SELECT 'orderbook' as type, timestamp, symbol, exchange, + bids, asks, sequence_id, mid_price, spread, bid_volume, ask_volume, + NULL as price, NULL as size, NULL as side, NULL as trade_id + FROM order_book_snapshots + WHERE {where_clause} + ) + UNION ALL + ( + SELECT 'trade' as type, timestamp, symbol, exchange, + NULL as bids, NULL as asks, NULL as sequence_id, + NULL as mid_price, NULL as spread, NULL as bid_volume, NULL as ask_volume, + price, size, side, trade_id + FROM trade_events + WHERE {where_clause} + ) + ORDER BY timestamp ASC + """ + + # Stream data in chunks + chunk_size = 1000 + offset = 0 + last_timestamp = session.current_time + + while session.status == ReplayStatus.RUNNING: + # Fetch chunk + chunk_query = f"{query} LIMIT {chunk_size} OFFSET {offset}" + rows = await self.storage_manager.connection_pool.fetch(chunk_query, *params) + + if not rows: + break + + # Process each row + for row in rows: + if session.status != ReplayStatus.RUNNING: + break + + # Calculate delay based on replay speed + if last_timestamp < row['timestamp']: + time_diff = (row['timestamp'] - last_timestamp).total_seconds() + delay = time_diff / session.speed + + if delay > 0: + await asyncio.sleep(delay) + + # Create data object + if row['type'] == 'orderbook': + data = await self._create_orderbook_from_row(row) + else: + data = await self._create_trade_from_row(row) + + # Notify data callbacks + await self._notify_data_callbacks(session_id, data) + + # Update session progress + session.events_replayed += 1 + session.current_time = row['timestamp'] + + if session.total_events > 0: + session.progress = session.events_replayed / session.total_events + + last_timestamp = row['timestamp'] + self.stats['total_events_replayed'] += 1 + + offset += chunk_size + + async def _create_orderbook_from_row(self, row: Dict) -> OrderBookSnapshot: + """Create OrderBookSnapshot from database row.""" + import json + from ..models.core import PriceLevel + + # Parse bids and asks from JSON + bids_data = json.loads(row['bids']) if row['bids'] else [] + asks_data = json.loads(row['asks']) if row['asks'] else [] + + bids = [PriceLevel(price=b['price'], size=b['size'], count=b.get('count')) + for b in bids_data] + asks = [PriceLevel(price=a['price'], size=a['size'], count=a.get('count')) + for a in asks_data] + + return OrderBookSnapshot( + symbol=row['symbol'], + exchange=row['exchange'], + timestamp=row['timestamp'], + bids=bids, + asks=asks, + sequence_id=row['sequence_id'] + ) + + async def _create_trade_from_row(self, row: Dict) -> TradeEvent: + """Create TradeEvent from database row.""" + return TradeEvent( + symbol=row['symbol'], + exchange=row['exchange'], + timestamp=row['timestamp'], + price=float(row['price']), + size=float(row['size']), + side=row['side'], + trade_id=row['trade_id'] + ) + + async def _notify_data_callbacks(self, session_id: str, + data: Union[OrderBookSnapshot, TradeEvent]) -> None: + """Notify all data callbacks for a session.""" + if session_id in self.session_callbacks: + callbacks = self.session_callbacks[session_id]['data'] + for callback in callbacks: + try: + if asyncio.iscoroutinefunction(callback): + await callback(data) + else: + callback(data) + except Exception as e: + logger.error(f"Data callback error for session {session_id}: {e}") + + async def _notify_status_callbacks(self, session_id: str, status: ReplayStatus) -> None: + """Notify all status callbacks for a session.""" + if session_id in self.session_callbacks: + callbacks = self.session_callbacks[session_id]['status'] + for callback in callbacks: + try: + if asyncio.iscoroutinefunction(callback): + await callback(session_id, status) + else: + callback(session_id, status) + except Exception as e: + logger.error(f"Status callback error for session {session_id}: {e}") + + async def _set_session_error(self, session_id: str, error_message: str) -> None: + """Set session to error state.""" + if session_id in self.sessions: + session = self.sessions[session_id] + session.status = ReplayStatus.ERROR + session.error_message = error_message + session.stopped_at = get_current_timestamp() + + await self._notify_status_callbacks(session_id, ReplayStatus.ERROR) + + def get_stats(self) -> Dict[str, Any]: + """Get replay manager statistics.""" + active_sessions = sum(1 for s in self.sessions.values() + if s.status == ReplayStatus.RUNNING) + + return { + **self.stats, + 'active_sessions': active_sessions, + 'total_sessions': len(self.sessions), + 'session_statuses': { + status.value: sum(1 for s in self.sessions.values() if s.status == status) + for status in ReplayStatus + } + } \ No newline at end of file diff --git a/COBY/tests/test_replay_system.py b/COBY/tests/test_replay_system.py new file mode 100644 index 0000000..2d3941b --- /dev/null +++ b/COBY/tests/test_replay_system.py @@ -0,0 +1,153 @@ +""" +Test script for the historical data replay system. +""" + +import asyncio +import logging +from datetime import datetime, timedelta + +from ..config import Config +from ..storage.storage_manager import StorageManager +from ..replay.replay_manager import HistoricalReplayManager +from ..models.core import ReplayStatus + +# Set up logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def test_replay_system(): + """Test the replay system functionality.""" + + # Initialize components + config = Config() + storage_manager = StorageManager(config) + + try: + # Initialize storage + logger.info("Initializing storage manager...") + await storage_manager.initialize() + + # Initialize replay manager + logger.info("Initializing replay manager...") + replay_manager = HistoricalReplayManager(storage_manager, config) + + # Test data range query + logger.info("Testing data range query...") + data_range = await replay_manager.get_available_data_range("BTCUSDT") + if data_range: + logger.info(f"Available data range: {data_range['start']} to {data_range['end']}") + else: + logger.warning("No data available for BTCUSDT") + return + + # Create test replay session + logger.info("Creating replay session...") + start_time = data_range['start'] + end_time = start_time + timedelta(minutes=5) # 5 minute replay + + session_id = replay_manager.create_replay_session( + start_time=start_time, + end_time=end_time, + speed=10.0, # 10x speed + symbols=["BTCUSDT"], + exchanges=["binance"] + ) + + logger.info(f"Created session: {session_id}") + + # Add data callback + data_count = 0 + + def data_callback(data): + nonlocal data_count + data_count += 1 + if data_count % 100 == 0: + logger.info(f"Received {data_count} data points") + + replay_manager.add_data_callback(session_id, data_callback) + + # Add status callback + def status_callback(session_id, status): + logger.info(f"Session {session_id} status changed to: {status.value}") + + replay_manager.add_status_callback(session_id, status_callback) + + # Start replay + logger.info("Starting replay...") + await replay_manager.start_replay(session_id) + + # Monitor progress + while True: + session = replay_manager.get_replay_status(session_id) + if not session: + break + + if session.status in [ReplayStatus.COMPLETED, ReplayStatus.ERROR, ReplayStatus.STOPPED]: + break + + logger.info(f"Progress: {session.progress:.2%}, Events: {session.events_replayed}") + await asyncio.sleep(2) + + # Final status + final_session = replay_manager.get_replay_status(session_id) + if final_session: + logger.info(f"Final status: {final_session.status.value}") + logger.info(f"Total events replayed: {final_session.events_replayed}") + logger.info(f"Total data callbacks: {data_count}") + + # Test session controls + logger.info("Testing session controls...") + + # Create another session for control testing + control_session_id = replay_manager.create_replay_session( + start_time=start_time, + end_time=end_time, + speed=1.0, + symbols=["BTCUSDT"] + ) + + # Start and immediately pause + await replay_manager.start_replay(control_session_id) + await asyncio.sleep(1) + await replay_manager.pause_replay(control_session_id) + + # Test seek + seek_time = start_time + timedelta(minutes=2) + success = replay_manager.seek_replay(control_session_id, seek_time) + logger.info(f"Seek to {seek_time}: {'success' if success else 'failed'}") + + # Test speed change + success = replay_manager.set_replay_speed(control_session_id, 5.0) + logger.info(f"Speed change to 5x: {'success' if success else 'failed'}") + + # Resume and stop + await replay_manager.resume_replay(control_session_id) + await asyncio.sleep(2) + await replay_manager.stop_replay(control_session_id) + + # Get statistics + stats = replay_manager.get_stats() + logger.info(f"Replay manager stats: {stats}") + + # List all sessions + sessions = replay_manager.list_replay_sessions() + logger.info(f"Total sessions: {len(sessions)}") + + # Clean up + for session in sessions: + replay_manager.delete_replay_session(session.session_id) + + logger.info("Replay system test completed successfully!") + + except Exception as e: + logger.error(f"Test failed: {e}") + raise + + finally: + # Clean up + await storage_manager.close() + + +if __name__ == "__main__": + asyncio.run(test_replay_system()) \ No newline at end of file