311 lines
11 KiB
Python
311 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
COBY Multi-Exchange Data Aggregation System
|
|
Main application entry point for Docker deployment
|
|
"""
|
|
|
|
import asyncio
|
|
import signal
|
|
import argparse
|
|
from typing import Optional
|
|
|
|
# Add current directory to path for imports
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from utils.logging import get_logger, setup_logging
|
|
from simple_config import Config
|
|
from monitoring.metrics_collector import metrics_collector
|
|
from monitoring.performance_monitor import get_performance_monitor
|
|
from monitoring.memory_monitor import memory_monitor
|
|
from api.rest_api import create_app
|
|
from api.websocket_server import websocket_manager
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
# Global reference for API access
|
|
_app_instance = None
|
|
|
|
class COBYApplication:
|
|
"""Main COBY application orchestrator"""
|
|
|
|
def __init__(self, config: Config):
|
|
global _app_instance
|
|
self.config = config
|
|
self.running = False
|
|
self.tasks = []
|
|
self.websocket_manager = websocket_manager
|
|
self.connectors = {}
|
|
_app_instance = self
|
|
|
|
async def start(self):
|
|
"""Start all application components"""
|
|
try:
|
|
logger.info("Starting COBY Multi-Exchange Data Aggregation System")
|
|
|
|
# Start monitoring systems
|
|
logger.info("Starting monitoring systems...")
|
|
metrics_collector.start_collection()
|
|
get_performance_monitor().start_monitoring()
|
|
memory_monitor.start_monitoring()
|
|
|
|
# WebSocket server is handled by FastAPI
|
|
logger.info("WebSocket manager initialized")
|
|
|
|
# Start REST API server (includes static file serving)
|
|
logger.info("Starting REST API server with static file serving...")
|
|
app = create_app(self.config)
|
|
api_task = asyncio.create_task(
|
|
self._run_api_server(app, self.config.api.host, self.config.api.port)
|
|
)
|
|
self.tasks.append(api_task)
|
|
|
|
# Start exchange connectors
|
|
logger.info("Starting exchange connectors...")
|
|
await self._start_exchange_connectors()
|
|
|
|
# Start data processing pipeline
|
|
logger.info("Starting data processing pipeline...")
|
|
await self._start_data_processing()
|
|
|
|
self.running = True
|
|
logger.info("COBY system started successfully")
|
|
|
|
# Wait for all tasks
|
|
await asyncio.gather(*self.tasks, return_exceptions=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting COBY application: {e}")
|
|
raise
|
|
|
|
async def stop(self):
|
|
"""Stop all application components"""
|
|
if not self.running:
|
|
return
|
|
|
|
logger.info("Stopping COBY Multi-Exchange Data Aggregation System")
|
|
|
|
try:
|
|
# Stop exchange connectors
|
|
for name, connector in self.connectors.items():
|
|
try:
|
|
logger.info(f"Stopping {name} connector...")
|
|
await connector.disconnect()
|
|
except Exception as e:
|
|
logger.error(f"Error stopping {name} connector: {e}")
|
|
|
|
# WebSocket connections will be closed automatically
|
|
|
|
# Cancel all tasks
|
|
for task in self.tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
# Wait for tasks to complete
|
|
if self.tasks:
|
|
await asyncio.gather(*self.tasks, return_exceptions=True)
|
|
|
|
# Stop monitoring systems
|
|
memory_monitor.stop_monitoring()
|
|
get_performance_monitor().stop_monitoring()
|
|
metrics_collector.stop_collection()
|
|
|
|
self.running = False
|
|
logger.info("COBY system stopped successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error stopping COBY application: {e}")
|
|
|
|
async def _start_exchange_connectors(self):
|
|
"""Start exchange connectors"""
|
|
try:
|
|
# Import real exchange connectors
|
|
from connectors.binance_connector import BinanceConnector
|
|
|
|
# Initialize real exchange connectors
|
|
self.connectors = {
|
|
'binance': BinanceConnector()
|
|
}
|
|
|
|
# Start connectors
|
|
for name, connector in self.connectors.items():
|
|
try:
|
|
logger.info(f"Starting {name} connector...")
|
|
# Set up data callback to broadcast to WebSocket
|
|
connector.add_data_callback(self._handle_connector_data)
|
|
connector.add_status_callback(self._handle_connector_status)
|
|
|
|
connector_task = asyncio.create_task(self._run_connector(connector))
|
|
self.tasks.append(connector_task)
|
|
except Exception as e:
|
|
logger.error(f"Failed to start {name} connector: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting exchange connectors: {e}")
|
|
|
|
async def _run_connector(self, connector):
|
|
"""Run a single connector"""
|
|
try:
|
|
# Connect to exchange
|
|
if await connector.connect():
|
|
logger.info(f"Connected to {connector.exchange_name}")
|
|
|
|
# Subscribe to default symbols
|
|
default_symbols = ['BTCUSDT', 'ETHUSDT', 'ADAUSDT', 'SOLUSDT']
|
|
for symbol in default_symbols:
|
|
try:
|
|
await connector.subscribe_orderbook(symbol)
|
|
await connector.subscribe_trades(symbol)
|
|
logger.info(f"Subscribed to {symbol} on {connector.exchange_name}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to subscribe to {symbol} on {connector.exchange_name}: {e}")
|
|
|
|
# Keep connector running
|
|
while connector.is_connected:
|
|
await asyncio.sleep(1)
|
|
|
|
else:
|
|
logger.error(f"Failed to connect to {connector.exchange_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error running {connector.exchange_name} connector: {e}")
|
|
|
|
async def _handle_connector_data(self, data_type: str, data):
|
|
"""Handle data from exchange connectors"""
|
|
try:
|
|
if data_type == 'orderbook':
|
|
# Broadcast order book data
|
|
await self.websocket_manager.broadcast_update(
|
|
data.symbol, 'orderbook', data
|
|
)
|
|
logger.debug(f"Broadcasted orderbook data for {data.symbol}")
|
|
|
|
elif data_type == 'trade':
|
|
# Broadcast trade data
|
|
await self.websocket_manager.broadcast_update(
|
|
data.symbol, 'trade', data
|
|
)
|
|
logger.debug(f"Broadcasted trade data for {data.symbol}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling connector data: {e}")
|
|
|
|
def _handle_connector_status(self, exchange_name: str, status):
|
|
"""Handle status updates from exchange connectors"""
|
|
try:
|
|
logger.info(f"Connector {exchange_name} status: {status.value}")
|
|
# Could broadcast status updates to dashboard here
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error handling connector status: {e}")
|
|
|
|
async def _start_data_processing(self):
|
|
"""Start data processing pipeline"""
|
|
try:
|
|
# Start data aggregation task
|
|
aggregation_task = asyncio.create_task(self._run_data_aggregation())
|
|
self.tasks.append(aggregation_task)
|
|
|
|
logger.info("Data processing pipeline started")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting data processing pipeline: {e}")
|
|
|
|
async def _run_data_aggregation(self):
|
|
"""Run data aggregation process"""
|
|
try:
|
|
while self.running:
|
|
# Placeholder for data aggregation logic
|
|
# This would collect data from connectors and process it
|
|
await asyncio.sleep(5)
|
|
|
|
# Log status
|
|
logger.debug("Data aggregation tick - simple data generator running")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in data aggregation: {e}")
|
|
|
|
|
|
|
|
async def _run_api_server(self, app, host: str, port: int):
|
|
"""Run the API server"""
|
|
try:
|
|
# Import here to avoid circular imports
|
|
import uvicorn
|
|
|
|
config = uvicorn.Config(
|
|
app,
|
|
host=host,
|
|
port=port,
|
|
log_level="info",
|
|
access_log=True
|
|
)
|
|
server = uvicorn.Server(config)
|
|
await server.serve()
|
|
|
|
except ImportError:
|
|
logger.error("uvicorn not available, falling back to basic server")
|
|
# Fallback implementation would go here
|
|
await asyncio.sleep(3600) # Keep running for an hour
|
|
|
|
|
|
async def main():
|
|
"""Main application entry point"""
|
|
parser = argparse.ArgumentParser(description='COBY Multi-Exchange Data Aggregation System')
|
|
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
|
|
parser.add_argument('--reload', action='store_true', help='Enable auto-reload (development)')
|
|
parser.add_argument('--config', type=str, help='Configuration file path')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging
|
|
log_level = 'DEBUG' if args.debug else 'INFO'
|
|
setup_logging(level=log_level)
|
|
|
|
# Load configuration
|
|
config = Config()
|
|
if args.debug:
|
|
config.debug = True
|
|
config.logging.level = 'DEBUG'
|
|
|
|
# Create and start application
|
|
app = COBYApplication(config)
|
|
|
|
# Setup signal handlers
|
|
def signal_handler(signum, frame):
|
|
logger.info(f"Received signal {signum}, shutting down...")
|
|
asyncio.create_task(app.stop())
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
try:
|
|
await app.start()
|
|
except KeyboardInterrupt:
|
|
logger.info("Received keyboard interrupt, shutting down...")
|
|
except Exception as e:
|
|
logger.error(f"Application error: {e}")
|
|
sys.exit(1)
|
|
finally:
|
|
await app.stop()
|
|
|
|
|
|
def get_app_instance():
|
|
"""Get the global application instance"""
|
|
return _app_instance
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Ensure we're running in the correct directory
|
|
os.chdir(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Run the application
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
print("\nShutdown complete")
|
|
except Exception as e:
|
|
print(f"Fatal error: {e}")
|
|
sys.exit(1) |